using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Text; using System.Threading; using Taobao.Top.Link.Channel; using Taobao.Top.Link.Endpoints; using Taobao.Top.Link.Util; using Top.Api; using Top.Api.Util; namespace Top.Tmc { /// 消息服务客户端 public class TmcClient { // sign parameters private const string GROUP_NAME = "group_name"; private const string SDK = "sdk"; private const string INTRANET_IP = "intranet_ip"; private TmcClientIdentity _id; private string _appSecret; private string _uri; private Endpoint _endpoint; private EndpointProxy _serverProxy; private volatile bool running; private int _heartbeatInterval = 45000; // 心跳频率(单位:毫秒) private int _reconnectIntervalSeconds = 15; // 重连周期(单位:秒) private int _pullRequestIntervalSeconds = 30; // 定时获取消息周期(单位:秒) private Timer _reconnectTimer; private Timer _pullRequestTimer; public event EventHandler OnMessage; /// 获取或设置Log public ITopLogger Log { get; set; } private bool enableTraceLog = true; public bool EnableTraceLog { get { return this.enableTraceLog;} set { this.enableTraceLog = value; } } /// 获取或设置定时发送拉取请求的周期(单位:秒) public int PullRequestIntervalSeconds { get { return this._pullRequestIntervalSeconds; } set { this._pullRequestIntervalSeconds = value; if (this._pullRequestTimer != null) this._pullRequestTimer.Change(TimeSpan.FromSeconds(this._pullRequestIntervalSeconds) , TimeSpan.FromSeconds(this._pullRequestIntervalSeconds)); } } /// 获取或设置自动重连间隔(单位:秒) public int ReconnectIntervalSeconds { get { return this._reconnectIntervalSeconds; } set { this._reconnectIntervalSeconds = value; if (this._reconnectTimer != null) this._reconnectTimer.Change(TimeSpan.FromSeconds(this._reconnectIntervalSeconds) , TimeSpan.FromSeconds(this._reconnectIntervalSeconds)); } } /// 以默认分组,初始化TMC客户端 public TmcClient(string appKey, string appSecret) : this(appKey, appSecret, "default") { } /// 初始化TMC客户端 public TmcClient(string appKey, string appSecret, string groupName) { this._appSecret = appSecret; this._id = new TmcClientIdentity(appKey, groupName); this.PrepareEndpoint(); } /// 连接到 TMC Server /// TMC server address, eg: ws://mc.api.taobao.com/ public void Connect(string uri) { this.running = true; doConnect(uri); this.StartReconnect(); this.StartPullRequest(); } private void doConnect(string uri) { var signHeader = new Dictionary(); var connHeader = new Dictionary(); signHeader.Add(Constants.APP_KEY, this._id.AppKey); connHeader.Add(Constants.APP_KEY, signHeader[Constants.APP_KEY]); signHeader.Add(GROUP_NAME, this._id.GroupName); connHeader.Add(GROUP_NAME, signHeader[GROUP_NAME]); signHeader.Add(Constants.TIMESTAMP, DateTime.Now.Ticks.ToString()); connHeader.Add(Constants.TIMESTAMP, signHeader[Constants.TIMESTAMP]); connHeader.Add(Constants.SIGN, TopUtils.SignTopRequest(signHeader, this._appSecret, Constants.SIGN_METHOD_MD5)); //extra fields connHeader.Add(SDK, Constants.SDK_VERSION); connHeader.Add(INTRANET_IP, TopUtils.GetIntranetIp()); this._serverProxy = this._endpoint.GetEndpoint(new TmcServerIdentity(), uri, connHeader); this._uri = uri; this.Log.Info("connected to tmc server: {0}", uri); } /// 向指定的主题发布一条与用户无关的消息。 /// 主题名称 /// 严格根据主题定义的消息内容(JSON/XML) public void Send(string topic, string content) { if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException("topic"); if (string.IsNullOrEmpty(content)) throw new ArgumentNullException("content"); IDictionary msg = new Dictionary(); msg.Add(MessageFields.KIND, MessageKind.Data); msg.Add(MessageFields.DATA_TOPIC, topic); msg.Add(MessageFields.DATA_CONTENT, content); this._serverProxy.SendAndWait(msg, 2000); } /// 向指定的主题发布一条与用户相关的消息。 /// 主题名称 /// 严格根据主题定义的消息内容(JSON/XML) /// 用户授权码 public void Send(string topic, string content, string session) { if (string.IsNullOrEmpty(topic)) throw new ArgumentNullException("topic"); if (string.IsNullOrEmpty(content)) throw new ArgumentNullException("content"); if (string.IsNullOrEmpty(session)) throw new ArgumentNullException("session"); IDictionary msg = new Dictionary(); msg.Add(MessageFields.KIND, MessageKind.Data); msg.Add(MessageFields.DATA_TOPIC, topic); msg.Add(MessageFields.DATA_CONTENT, content); msg.Add(MessageFields.DATA_INCOMING_USER_SESSION, session); this._serverProxy.SendAndWait(msg, 2000); } /// 向服务端发送消息拉取请求 protected internal void PullRequest() { IDictionary msg = new Dictionary(); msg.Add(MessageFields.KIND, MessageKind.PullRequest); this._serverProxy.Send(msg); } /// 确认消息 protected internal void Confirm(long id) { IDictionary msg = new Dictionary(); msg.Add(MessageFields.KIND, MessageKind.Confirm); msg.Add(MessageFields.CONFIRM_ID, id); this._serverProxy.Send(msg); } /// 确认消息 protected internal void Fail(long id,string errorMsg) { IDictionary msg = new Dictionary(); msg.Add(MessageFields.KIND, MessageKind.Failed); msg.Add(MessageFields.CONFIRM_ID, id); msg.Add(MessageFields.CONFIRM_MSG, errorMsg); this._serverProxy.Send(msg); } private void PrepareEndpoint() { this.Log = Top.Api.Log.Instance; this._endpoint = new Endpoint(Log, this._id); this._endpoint.ChannelSelector = new ClientChannelSharedSelector(Log) { HeartbeatPeriod = this._heartbeatInterval }; this._endpoint.OnMessage += new EventHandler(InternalOnMessage); this._endpoint.OnAckMessage += new EventHandler(InternalOnAckMessage); } private void InternalOnMessage(object sender, EndpointContext context) { if (enableTraceLog) { this.Log.Info("messsage from {0}: {1}", context.MessageFrom, this.Dump(context.Message)); } if (this.OnMessage == null) return; ThreadPool.QueueUserWorkItem(o => { if (!this.running) { this.Log.Info(string.Format("message dropped as client closed: {0}", this.Dump(context.Message))); return; } Message msg = this.ParseMessage(context.Message); var args = new MessageArgs(msg, m => this.Confirm(m.Id)); var sw = new Stopwatch(); try { sw.Start(); this.OnMessage(this, args); sw.Stop(); } catch (Exception e) { args.Fail(e.Message); } if (args._isFail) { this.Log.Info("process message error: {0}", args._reason); this.Fail(msg.Id,args._reason.Length > 128 ? args._reason.Substring(0,128) : args._reason); return; } // prevent confirm attach if (sw.ElapsedMilliseconds <= 1) { Thread.Sleep(10); } if (args._isConfirmed){ return; } try { this.Confirm(msg.Id); if (enableTraceLog) { this.Log.Info("confirm message topic: {0}, dataid: {1}", msg.Topic, msg.Dataid); } } catch (Exception e) { this.Log.Warn(string.Format("confirm message {0} error {1}", this.Dump(context.Message), e.StackTrace)); } }); } private void InternalOnAckMessage(object sender, AckMessageArgs e) { if (this.Log.IsDebugEnabled()) this.Log.Debug("ack messsage from {0}: {1}", e.MessageFrom, e.Message); } private void StartReconnect() { if (this._reconnectTimer != null) return; this._reconnectTimer = new Timer(o => { try { if (!this._serverProxy.hasValidSender()) { this.Log.Info("reconning..."); this.doConnect(this._uri); } } catch (Exception e) { this.Log.Warn("reconnect error", e); } }, null , TimeSpan.FromSeconds(this._reconnectIntervalSeconds) , TimeSpan.FromSeconds(this._reconnectIntervalSeconds)); } private void StartPullRequest() { if (this._pullRequestTimer != null) return; this._pullRequestTimer = new Timer(o => { try { if (this._serverProxy.hasValidSender()) { this.PullRequest(); } } catch (Exception e) { this.Log.Warn("pull request error", e); } } , null , TimeSpan.FromMilliseconds(500) , TimeSpan.FromSeconds(this.PullRequestIntervalSeconds)); } private Message ParseMessage(IDictionary raw) { var msg = new Message(); msg.Id = this.GetValue(raw, MessageFields.OUTGOING_ID); msg.Topic = this.GetValue(raw, MessageFields.DATA_TOPIC); msg.PubAppKey = this.GetValue(raw, MessageFields.DATA_OUTGOING_PUBLISHER); msg.PubTime = this.GetValue(raw, MessageFields.DATA_PUBLISH_TIME); msg.UserId = this.GetValue(raw, MessageFields.DATA_OUTGOING_USER_ID); msg.UserNick = this.GetValue(raw, MessageFields.DATA_OUTGOING_USER_NICK); msg.OutgoingTime = this.GetValue(raw, MessageFields.DATA_ATTACH_OUTGOING_TIME); msg.Dataid = this.GetValue(raw, MessageFields.DATA_DATAID); if (!raw.ContainsKey(MessageFields.DATA_CONTENT)) return msg; msg.Content = raw[MessageFields.DATA_CONTENT] is byte[] ? Encoding.UTF8.GetString(GZIPHelper.Unzip(raw[MessageFields.DATA_CONTENT] as byte[])) : (string)raw[MessageFields.DATA_CONTENT]; return msg; } private T GetValue(IDictionary raw, string key) { if (raw.ContainsKey(key)) { object value = raw[key]; if (value != null) { return (T)value; } } return default(T); } private string Dump(IDictionary raw) { var buf = new StringBuilder(); foreach (var i in raw) buf.AppendFormat("{0}={1}|", i.Key, i.Value); return buf.ToString(); } public void Close() { this.running = false; if (this._pullRequestTimer != null) { this._pullRequestTimer.Dispose(); this._pullRequestTimer = null; } if (this._reconnectTimer != null) { this._reconnectTimer.Dispose(); this._reconnectTimer = null; } this._serverProxy.Close(this._uri, "client closed"); this.Log.Warn("tmc client closed"); } public bool Online { get { return this._serverProxy != null && this._serverProxy.hasValidSender(); } } } }