using System; 
 | 
using System.Collections; 
 | 
using System.Collections.Generic; 
 | 
using System.Diagnostics; 
 | 
using System.Text; 
 | 
using System.Threading; 
 | 
using Taobao.Top.Link.Channel; 
 | 
using Taobao.Top.Link.Endpoints; 
 | 
using Taobao.Top.Link.Util; 
 | 
using Top.Api; 
 | 
using Top.Api.Util; 
 | 
  
 | 
namespace Top.Tmc 
 | 
{ 
 | 
    /// <summary>消息服务客户端</summary> 
 | 
    public class TmcClient 
 | 
    { 
 | 
        // sign parameters 
 | 
        private const string GROUP_NAME = "group_name"; 
 | 
        private const string SDK = "sdk"; 
 | 
        private const string INTRANET_IP = "intranet_ip"; 
 | 
  
 | 
        private TmcClientIdentity _id; 
 | 
        private string _appSecret; 
 | 
        private string _uri; 
 | 
        private Endpoint _endpoint; 
 | 
        private EndpointProxy _serverProxy; 
 | 
  
 | 
        private volatile bool running; 
 | 
  
 | 
        private int _heartbeatInterval = 45000; // 心跳频率(单位:毫秒) 
 | 
        private int _reconnectIntervalSeconds = 15; // 重连周期(单位:秒) 
 | 
        private int _pullRequestIntervalSeconds = 30; // 定时获取消息周期(单位:秒) 
 | 
        private Timer _reconnectTimer; 
 | 
        private Timer _pullRequestTimer; 
 | 
  
 | 
        public event EventHandler<MessageArgs> OnMessage; 
 | 
  
 | 
        /// <summary>获取或设置Log</summary> 
 | 
        public ITopLogger Log { get; set; } 
 | 
  
 | 
        private bool enableTraceLog = true; 
 | 
  
 | 
        public bool EnableTraceLog 
 | 
        { 
 | 
            get { return this.enableTraceLog;} 
 | 
            set { this.enableTraceLog = value; } 
 | 
        } 
 | 
  
 | 
        /// <summary>获取或设置定时发送拉取请求的周期(单位:秒)</summary> 
 | 
        public int PullRequestIntervalSeconds 
 | 
        { 
 | 
            get { return this._pullRequestIntervalSeconds; } 
 | 
            set 
 | 
            { 
 | 
                this._pullRequestIntervalSeconds = value; 
 | 
                if (this._pullRequestTimer != null) 
 | 
                    this._pullRequestTimer.Change(TimeSpan.FromSeconds(this._pullRequestIntervalSeconds) 
 | 
                        , TimeSpan.FromSeconds(this._pullRequestIntervalSeconds)); 
 | 
            } 
 | 
        } 
 | 
  
 | 
        /// <summary>获取或设置自动重连间隔(单位:秒)</summary> 
 | 
        public int ReconnectIntervalSeconds 
 | 
        { 
 | 
            get { return this._reconnectIntervalSeconds; } 
 | 
            set 
 | 
            { 
 | 
                this._reconnectIntervalSeconds = value; 
 | 
                if (this._reconnectTimer != null) 
 | 
                    this._reconnectTimer.Change(TimeSpan.FromSeconds(this._reconnectIntervalSeconds) 
 | 
                        , TimeSpan.FromSeconds(this._reconnectIntervalSeconds)); 
 | 
            } 
 | 
        } 
 | 
  
 | 
        /// <summary>以默认分组,初始化TMC客户端</summary> 
 | 
        public TmcClient(string appKey, string appSecret) : this(appKey, appSecret, "default") { } 
 | 
  
 | 
        /// <summary>初始化TMC客户端</summary> 
 | 
        public TmcClient(string appKey, string appSecret, string groupName) 
 | 
        { 
 | 
            this._appSecret = appSecret; 
 | 
            this._id = new TmcClientIdentity(appKey, groupName); 
 | 
            this.PrepareEndpoint(); 
 | 
        } 
 | 
  
 | 
        /// <summary>连接到 TMC Server</summary> 
 | 
        /// <param name="uri">TMC server address, eg: ws://mc.api.taobao.com/</param> 
 | 
        public void Connect(string uri) 
 | 
        { 
 | 
            this.running = true; 
 | 
            doConnect(uri); 
 | 
            this.StartReconnect(); 
 | 
            this.StartPullRequest(); 
 | 
        } 
 | 
  
 | 
        private void doConnect(string uri) 
 | 
        { 
 | 
            var signHeader = new Dictionary<string, string>(); 
 | 
            var connHeader = new Dictionary<string, object>(); 
 | 
            signHeader.Add(Constants.APP_KEY, this._id.AppKey); 
 | 
            connHeader.Add(Constants.APP_KEY, signHeader[Constants.APP_KEY]); 
 | 
  
 | 
            signHeader.Add(GROUP_NAME, this._id.GroupName); 
 | 
            connHeader.Add(GROUP_NAME, signHeader[GROUP_NAME]); 
 | 
  
 | 
            signHeader.Add(Constants.TIMESTAMP, DateTime.Now.Ticks.ToString()); 
 | 
            connHeader.Add(Constants.TIMESTAMP, signHeader[Constants.TIMESTAMP]); 
 | 
  
 | 
            connHeader.Add(Constants.SIGN, TopUtils.SignTopRequest(signHeader, this._appSecret, Constants.SIGN_METHOD_MD5)); 
 | 
            //extra fields 
 | 
            connHeader.Add(SDK, Constants.SDK_VERSION); 
 | 
            connHeader.Add(INTRANET_IP, TopUtils.GetIntranetIp()); 
 | 
            this._serverProxy = this._endpoint.GetEndpoint(new TmcServerIdentity(), uri, connHeader); 
 | 
            this._uri = uri; 
 | 
            this.Log.Info("connected to tmc server: {0}", uri); 
 | 
        } 
 | 
  
 | 
        /// <summary>向指定的主题发布一条与用户无关的消息。</summary> 
 | 
        /// <param name="topic">主题名称</param> 
 | 
        /// <param name="content">严格根据主题定义的消息内容(JSON/XML)</param> 
 | 
        public void Send(string topic, string content) 
 | 
        { 
 | 
            if (string.IsNullOrEmpty(topic)) 
 | 
                throw new ArgumentNullException("topic"); 
 | 
            if (string.IsNullOrEmpty(content)) 
 | 
                throw new ArgumentNullException("content"); 
 | 
  
 | 
            IDictionary<string, object> msg = new Dictionary<string, object>(); 
 | 
            msg.Add(MessageFields.KIND, MessageKind.Data); 
 | 
            msg.Add(MessageFields.DATA_TOPIC, topic); 
 | 
            msg.Add(MessageFields.DATA_CONTENT, content); 
 | 
            this._serverProxy.SendAndWait(msg, 2000); 
 | 
        } 
 | 
  
 | 
        /// <summary>向指定的主题发布一条与用户相关的消息。</summary> 
 | 
        /// <param name="topic">主题名称</param> 
 | 
        /// <param name="content">严格根据主题定义的消息内容(JSON/XML)</param> 
 | 
        /// <param name="session">用户授权码</param> 
 | 
        public void Send(string topic, string content, string session) 
 | 
        { 
 | 
            if (string.IsNullOrEmpty(topic)) 
 | 
                throw new ArgumentNullException("topic"); 
 | 
            if (string.IsNullOrEmpty(content)) 
 | 
                throw new ArgumentNullException("content"); 
 | 
            if (string.IsNullOrEmpty(session)) 
 | 
                throw new ArgumentNullException("session"); 
 | 
  
 | 
            IDictionary<string, object> msg = new Dictionary<string, object>(); 
 | 
            msg.Add(MessageFields.KIND, MessageKind.Data); 
 | 
            msg.Add(MessageFields.DATA_TOPIC, topic); 
 | 
            msg.Add(MessageFields.DATA_CONTENT, content); 
 | 
            msg.Add(MessageFields.DATA_INCOMING_USER_SESSION, session); 
 | 
            this._serverProxy.SendAndWait(msg, 2000); 
 | 
        } 
 | 
  
 | 
        /// <summary>向服务端发送消息拉取请求</summary> 
 | 
        protected internal void PullRequest() 
 | 
        { 
 | 
            IDictionary<string, object> msg = new Dictionary<string, object>(); 
 | 
            msg.Add(MessageFields.KIND, MessageKind.PullRequest); 
 | 
            this._serverProxy.Send(msg); 
 | 
        } 
 | 
  
 | 
        /// <summary>确认消息</summary> 
 | 
        protected internal void Confirm(long id) 
 | 
        { 
 | 
            IDictionary<string, object> msg = new Dictionary<string, object>(); 
 | 
            msg.Add(MessageFields.KIND, MessageKind.Confirm); 
 | 
            msg.Add(MessageFields.CONFIRM_ID, id); 
 | 
            this._serverProxy.Send(msg); 
 | 
        } 
 | 
  
 | 
        /// <summary>确认消息</summary> 
 | 
        protected internal void Fail(long id,string errorMsg) 
 | 
        { 
 | 
            IDictionary<string, object> msg = new Dictionary<string, object>(); 
 | 
            msg.Add(MessageFields.KIND, MessageKind.Failed); 
 | 
            msg.Add(MessageFields.CONFIRM_ID, id); 
 | 
            msg.Add(MessageFields.CONFIRM_MSG, errorMsg); 
 | 
  
 | 
            this._serverProxy.Send(msg); 
 | 
        } 
 | 
  
 | 
        private void PrepareEndpoint() 
 | 
        { 
 | 
            this.Log = Top.Api.Log.Instance; 
 | 
            this._endpoint = new Endpoint(Log, this._id); 
 | 
            this._endpoint.ChannelSelector = new ClientChannelSharedSelector(Log) { HeartbeatPeriod = this._heartbeatInterval }; 
 | 
            this._endpoint.OnMessage += new EventHandler<EndpointContext>(InternalOnMessage); 
 | 
            this._endpoint.OnAckMessage += new EventHandler<AckMessageArgs>(InternalOnAckMessage); 
 | 
        } 
 | 
  
 | 
        private void InternalOnMessage(object sender, EndpointContext context) 
 | 
        { 
 | 
            if (enableTraceLog) 
 | 
            { 
 | 
                this.Log.Info("messsage from {0}: {1}", context.MessageFrom, this.Dump(context.Message)); 
 | 
            } 
 | 
             
 | 
            if (this.OnMessage == null) 
 | 
                return; 
 | 
  
 | 
            ThreadPool.QueueUserWorkItem(o => 
 | 
            { 
 | 
                if (!this.running) 
 | 
                { 
 | 
                    this.Log.Info(string.Format("message dropped as client closed: {0}", this.Dump(context.Message))); 
 | 
                    return; 
 | 
                } 
 | 
  
 | 
                Message msg = this.ParseMessage(context.Message); 
 | 
                var args = new MessageArgs(msg, m => this.Confirm(m.Id)); 
 | 
                var sw = new Stopwatch(); 
 | 
                try 
 | 
                { 
 | 
                    sw.Start(); 
 | 
                    this.OnMessage(this, args); 
 | 
                    sw.Stop(); 
 | 
                } 
 | 
                catch (Exception e) 
 | 
                { 
 | 
                    args.Fail(e.Message); 
 | 
                } 
 | 
  
 | 
                if (args._isFail) 
 | 
                { 
 | 
                    this.Log.Info("process message error: {0}", args._reason); 
 | 
                    this.Fail(msg.Id,args._reason.Length > 128 ? args._reason.Substring(0,128) : args._reason); 
 | 
                    return; 
 | 
                } 
 | 
  
 | 
                // prevent confirm attach 
 | 
                if (sw.ElapsedMilliseconds <= 1) 
 | 
                { 
 | 
                    Thread.Sleep(10); 
 | 
                } 
 | 
  
 | 
                if (args._isConfirmed){ 
 | 
                    return; 
 | 
                } 
 | 
  
 | 
                try 
 | 
                { 
 | 
                    this.Confirm(msg.Id); 
 | 
  
 | 
                    if (enableTraceLog) 
 | 
                    { 
 | 
                        this.Log.Info("confirm message topic: {0}, dataid: {1}", msg.Topic, msg.Dataid); 
 | 
                    } 
 | 
                } 
 | 
                catch (Exception e) 
 | 
                { 
 | 
                    this.Log.Warn(string.Format("confirm message {0} error {1}", this.Dump(context.Message), e.StackTrace)); 
 | 
                } 
 | 
            }); 
 | 
        } 
 | 
  
 | 
        private void InternalOnAckMessage(object sender, AckMessageArgs e) 
 | 
        { 
 | 
            if (this.Log.IsDebugEnabled()) 
 | 
                this.Log.Debug("ack messsage from {0}: {1}", e.MessageFrom, e.Message); 
 | 
        } 
 | 
  
 | 
        private void StartReconnect() 
 | 
        { 
 | 
            if (this._reconnectTimer != null) return; 
 | 
            this._reconnectTimer = new Timer(o => 
 | 
            { 
 | 
                try 
 | 
                { 
 | 
                    if (!this._serverProxy.hasValidSender()) 
 | 
                    { 
 | 
                        this.Log.Info("reconning..."); 
 | 
                        this.doConnect(this._uri); 
 | 
                    } 
 | 
                } 
 | 
                catch (Exception e) 
 | 
                { 
 | 
                    this.Log.Warn("reconnect error", e); 
 | 
                } 
 | 
            }, null 
 | 
            , TimeSpan.FromSeconds(this._reconnectIntervalSeconds) 
 | 
            , TimeSpan.FromSeconds(this._reconnectIntervalSeconds)); 
 | 
        } 
 | 
  
 | 
        private void StartPullRequest() 
 | 
        { 
 | 
            if (this._pullRequestTimer != null) return; 
 | 
            this._pullRequestTimer = new Timer(o => 
 | 
            { 
 | 
                try 
 | 
                { 
 | 
                    if (this._serverProxy.hasValidSender()) 
 | 
                    { 
 | 
                        this.PullRequest(); 
 | 
                    } 
 | 
                } 
 | 
                catch (Exception e) 
 | 
                { 
 | 
                    this.Log.Warn("pull request error", e); 
 | 
                } 
 | 
            } 
 | 
            , null 
 | 
            , TimeSpan.FromMilliseconds(500) 
 | 
            , TimeSpan.FromSeconds(this.PullRequestIntervalSeconds)); 
 | 
        } 
 | 
  
 | 
        private Message ParseMessage(IDictionary<string, object> raw) 
 | 
        { 
 | 
            var msg = new Message(); 
 | 
            msg.Id = this.GetValue<long>(raw, MessageFields.OUTGOING_ID); 
 | 
            msg.Topic = this.GetValue<string>(raw, MessageFields.DATA_TOPIC); 
 | 
            msg.PubAppKey = this.GetValue<string>(raw, MessageFields.DATA_OUTGOING_PUBLISHER); 
 | 
            msg.PubTime = this.GetValue<DateTime>(raw, MessageFields.DATA_PUBLISH_TIME); 
 | 
            msg.UserId = this.GetValue<long>(raw, MessageFields.DATA_OUTGOING_USER_ID); 
 | 
            msg.UserNick = this.GetValue<string>(raw, MessageFields.DATA_OUTGOING_USER_NICK); 
 | 
            msg.OutgoingTime = this.GetValue<DateTime>(raw, MessageFields.DATA_ATTACH_OUTGOING_TIME); 
 | 
            msg.Dataid = this.GetValue<object>(raw, MessageFields.DATA_DATAID); 
 | 
  
 | 
            if (!raw.ContainsKey(MessageFields.DATA_CONTENT)) 
 | 
                return msg; 
 | 
            msg.Content = raw[MessageFields.DATA_CONTENT] is byte[] 
 | 
                ? Encoding.UTF8.GetString(GZIPHelper.Unzip(raw[MessageFields.DATA_CONTENT] as byte[])) 
 | 
                : (string)raw[MessageFields.DATA_CONTENT]; 
 | 
  
 | 
            return msg; 
 | 
        } 
 | 
  
 | 
        private T GetValue<T>(IDictionary<string, object> raw, string key) 
 | 
        { 
 | 
            if (raw.ContainsKey(key)) 
 | 
            { 
 | 
                object value = raw[key]; 
 | 
                if (value != null) 
 | 
                { 
 | 
                    return (T)value; 
 | 
                } 
 | 
            } 
 | 
            return default(T); 
 | 
        } 
 | 
  
 | 
        private string Dump(IDictionary<string, object> raw) 
 | 
        { 
 | 
            var buf = new StringBuilder(); 
 | 
            foreach (var i in raw) 
 | 
                buf.AppendFormat("{0}={1}|", i.Key, i.Value); 
 | 
            return buf.ToString(); 
 | 
        } 
 | 
  
 | 
        public void Close() 
 | 
        { 
 | 
            this.running = false; 
 | 
            if (this._pullRequestTimer != null) 
 | 
            { 
 | 
                this._pullRequestTimer.Dispose(); 
 | 
                this._pullRequestTimer = null; 
 | 
            } 
 | 
            if (this._reconnectTimer != null) 
 | 
            { 
 | 
                this._reconnectTimer.Dispose(); 
 | 
                this._reconnectTimer = null; 
 | 
            } 
 | 
            this._serverProxy.Close(this._uri, "client closed"); 
 | 
            this.Log.Warn("tmc client closed"); 
 | 
        } 
 | 
  
 | 
        public bool Online 
 | 
        { 
 | 
            get 
 | 
            { 
 | 
                return this._serverProxy != null && this._serverProxy.hasValidSender(); 
 | 
            } 
 | 
        } 
 | 
    } 
 | 
} 
 |