using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using Taobao.Top.Link.Channel;
using Taobao.Top.Link.Endpoints;
using Taobao.Top.Link.Util;
using Top.Api;
using Top.Api.Util;
namespace Top.Tmc
{
    /// 消息服务客户端
    public class TmcClient
    {
        // sign parameters
        private const string GROUP_NAME = "group_name";
        private const string SDK = "sdk";
        private const string INTRANET_IP = "intranet_ip";
        private TmcClientIdentity _id;
        private string _appSecret;
        private string _uri;
        private Endpoint _endpoint;
        private EndpointProxy _serverProxy;
        private volatile bool running;
        private int _heartbeatInterval = 45000; // 心跳频率(单位:毫秒)
        private int _reconnectIntervalSeconds = 15; // 重连周期(单位:秒)
        private int _pullRequestIntervalSeconds = 30; // 定时获取消息周期(单位:秒)
        private Timer _reconnectTimer;
        private Timer _pullRequestTimer;
        public event EventHandler OnMessage;
        /// 获取或设置Log
        public ITopLogger Log { get; set; }
        private bool enableTraceLog = true;
        public bool EnableTraceLog
        {
            get { return this.enableTraceLog;}
            set { this.enableTraceLog = value; }
        }
        /// 获取或设置定时发送拉取请求的周期(单位:秒)
        public int PullRequestIntervalSeconds
        {
            get { return this._pullRequestIntervalSeconds; }
            set
            {
                this._pullRequestIntervalSeconds = value;
                if (this._pullRequestTimer != null)
                    this._pullRequestTimer.Change(TimeSpan.FromSeconds(this._pullRequestIntervalSeconds)
                        , TimeSpan.FromSeconds(this._pullRequestIntervalSeconds));
            }
        }
        /// 获取或设置自动重连间隔(单位:秒)
        public int ReconnectIntervalSeconds
        {
            get { return this._reconnectIntervalSeconds; }
            set
            {
                this._reconnectIntervalSeconds = value;
                if (this._reconnectTimer != null)
                    this._reconnectTimer.Change(TimeSpan.FromSeconds(this._reconnectIntervalSeconds)
                        , TimeSpan.FromSeconds(this._reconnectIntervalSeconds));
            }
        }
        /// 以默认分组,初始化TMC客户端
        public TmcClient(string appKey, string appSecret) : this(appKey, appSecret, "default") { }
        /// 初始化TMC客户端
        public TmcClient(string appKey, string appSecret, string groupName)
        {
            this._appSecret = appSecret;
            this._id = new TmcClientIdentity(appKey, groupName);
            this.PrepareEndpoint();
        }
        /// 连接到 TMC Server
        /// TMC server address, eg: ws://mc.api.taobao.com/
        public void Connect(string uri)
        {
            this.running = true;
            doConnect(uri);
            this.StartReconnect();
            this.StartPullRequest();
        }
        private void doConnect(string uri)
        {
            var signHeader = new Dictionary();
            var connHeader = new Dictionary();
            signHeader.Add(Constants.APP_KEY, this._id.AppKey);
            connHeader.Add(Constants.APP_KEY, signHeader[Constants.APP_KEY]);
            signHeader.Add(GROUP_NAME, this._id.GroupName);
            connHeader.Add(GROUP_NAME, signHeader[GROUP_NAME]);
            signHeader.Add(Constants.TIMESTAMP, DateTime.Now.Ticks.ToString());
            connHeader.Add(Constants.TIMESTAMP, signHeader[Constants.TIMESTAMP]);
            connHeader.Add(Constants.SIGN, TopUtils.SignTopRequest(signHeader, this._appSecret, Constants.SIGN_METHOD_MD5));
            //extra fields
            connHeader.Add(SDK, Constants.SDK_VERSION);
            connHeader.Add(INTRANET_IP, TopUtils.GetIntranetIp());
            this._serverProxy = this._endpoint.GetEndpoint(new TmcServerIdentity(), uri, connHeader);
            this._uri = uri;
            this.Log.Info("connected to tmc server: {0}", uri);
        }
        /// 向指定的主题发布一条与用户无关的消息。
        /// 主题名称
        /// 严格根据主题定义的消息内容(JSON/XML)
        public void Send(string topic, string content)
        {
            if (string.IsNullOrEmpty(topic))
                throw new ArgumentNullException("topic");
            if (string.IsNullOrEmpty(content))
                throw new ArgumentNullException("content");
            IDictionary msg = new Dictionary();
            msg.Add(MessageFields.KIND, MessageKind.Data);
            msg.Add(MessageFields.DATA_TOPIC, topic);
            msg.Add(MessageFields.DATA_CONTENT, content);
            this._serverProxy.SendAndWait(msg, 2000);
        }
        /// 向指定的主题发布一条与用户相关的消息。
        /// 主题名称
        /// 严格根据主题定义的消息内容(JSON/XML)
        /// 用户授权码
        public void Send(string topic, string content, string session)
        {
            if (string.IsNullOrEmpty(topic))
                throw new ArgumentNullException("topic");
            if (string.IsNullOrEmpty(content))
                throw new ArgumentNullException("content");
            if (string.IsNullOrEmpty(session))
                throw new ArgumentNullException("session");
            IDictionary msg = new Dictionary();
            msg.Add(MessageFields.KIND, MessageKind.Data);
            msg.Add(MessageFields.DATA_TOPIC, topic);
            msg.Add(MessageFields.DATA_CONTENT, content);
            msg.Add(MessageFields.DATA_INCOMING_USER_SESSION, session);
            this._serverProxy.SendAndWait(msg, 2000);
        }
        /// 向服务端发送消息拉取请求
        protected internal void PullRequest()
        {
            IDictionary msg = new Dictionary();
            msg.Add(MessageFields.KIND, MessageKind.PullRequest);
            this._serverProxy.Send(msg);
        }
        /// 确认消息
        protected internal void Confirm(long id)
        {
            IDictionary msg = new Dictionary();
            msg.Add(MessageFields.KIND, MessageKind.Confirm);
            msg.Add(MessageFields.CONFIRM_ID, id);
            this._serverProxy.Send(msg);
        }
        /// 确认消息
        protected internal void Fail(long id,string errorMsg)
        {
            IDictionary msg = new Dictionary();
            msg.Add(MessageFields.KIND, MessageKind.Failed);
            msg.Add(MessageFields.CONFIRM_ID, id);
            msg.Add(MessageFields.CONFIRM_MSG, errorMsg);
            this._serverProxy.Send(msg);
        }
        private void PrepareEndpoint()
        {
            this.Log = Top.Api.Log.Instance;
            this._endpoint = new Endpoint(Log, this._id);
            this._endpoint.ChannelSelector = new ClientChannelSharedSelector(Log) { HeartbeatPeriod = this._heartbeatInterval };
            this._endpoint.OnMessage += new EventHandler(InternalOnMessage);
            this._endpoint.OnAckMessage += new EventHandler(InternalOnAckMessage);
        }
        private void InternalOnMessage(object sender, EndpointContext context)
        {
            if (enableTraceLog)
            {
                this.Log.Info("messsage from {0}: {1}", context.MessageFrom, this.Dump(context.Message));
            }
            
            if (this.OnMessage == null)
                return;
            ThreadPool.QueueUserWorkItem(o =>
            {
                if (!this.running)
                {
                    this.Log.Info(string.Format("message dropped as client closed: {0}", this.Dump(context.Message)));
                    return;
                }
                Message msg = this.ParseMessage(context.Message);
                var args = new MessageArgs(msg, m => this.Confirm(m.Id));
                var sw = new Stopwatch();
                try
                {
                    sw.Start();
                    this.OnMessage(this, args);
                    sw.Stop();
                }
                catch (Exception e)
                {
                    args.Fail(e.Message);
                }
                if (args._isFail)
                {
                    this.Log.Info("process message error: {0}", args._reason);
                    this.Fail(msg.Id,args._reason.Length > 128 ? args._reason.Substring(0,128) : args._reason);
                    return;
                }
                // prevent confirm attach
                if (sw.ElapsedMilliseconds <= 1)
                {
                    Thread.Sleep(10);
                }
                if (args._isConfirmed){
                    return;
                }
                try
                {
                    this.Confirm(msg.Id);
                    if (enableTraceLog)
                    {
                        this.Log.Info("confirm message topic: {0}, dataid: {1}", msg.Topic, msg.Dataid);
                    }
                }
                catch (Exception e)
                {
                    this.Log.Warn(string.Format("confirm message {0} error {1}", this.Dump(context.Message), e.StackTrace));
                }
            });
        }
        private void InternalOnAckMessage(object sender, AckMessageArgs e)
        {
            if (this.Log.IsDebugEnabled())
                this.Log.Debug("ack messsage from {0}: {1}", e.MessageFrom, e.Message);
        }
        private void StartReconnect()
        {
            if (this._reconnectTimer != null) return;
            this._reconnectTimer = new Timer(o =>
            {
                try
                {
                    if (!this._serverProxy.hasValidSender())
                    {
                        this.Log.Info("reconning...");
                        this.doConnect(this._uri);
                    }
                }
                catch (Exception e)
                {
                    this.Log.Warn("reconnect error", e);
                }
            }, null
            , TimeSpan.FromSeconds(this._reconnectIntervalSeconds)
            , TimeSpan.FromSeconds(this._reconnectIntervalSeconds));
        }
        private void StartPullRequest()
        {
            if (this._pullRequestTimer != null) return;
            this._pullRequestTimer = new Timer(o =>
            {
                try
                {
                    if (this._serverProxy.hasValidSender())
                    {
                        this.PullRequest();
                    }
                }
                catch (Exception e)
                {
                    this.Log.Warn("pull request error", e);
                }
            }
            , null
            , TimeSpan.FromMilliseconds(500)
            , TimeSpan.FromSeconds(this.PullRequestIntervalSeconds));
        }
        private Message ParseMessage(IDictionary raw)
        {
            var msg = new Message();
            msg.Id = this.GetValue(raw, MessageFields.OUTGOING_ID);
            msg.Topic = this.GetValue(raw, MessageFields.DATA_TOPIC);
            msg.PubAppKey = this.GetValue(raw, MessageFields.DATA_OUTGOING_PUBLISHER);
            msg.PubTime = this.GetValue(raw, MessageFields.DATA_PUBLISH_TIME);
            msg.UserId = this.GetValue(raw, MessageFields.DATA_OUTGOING_USER_ID);
            msg.UserNick = this.GetValue(raw, MessageFields.DATA_OUTGOING_USER_NICK);
            msg.OutgoingTime = this.GetValue(raw, MessageFields.DATA_ATTACH_OUTGOING_TIME);
            msg.Dataid = this.GetValue