using System;
|
using System.Collections;
|
using System.Collections.Generic;
|
using System.Diagnostics;
|
using System.Text;
|
using System.Threading;
|
using Taobao.Top.Link.Channel;
|
using Taobao.Top.Link.Endpoints;
|
using Taobao.Top.Link.Util;
|
using Top.Api;
|
using Top.Api.Util;
|
|
namespace Top.Tmc
|
{
|
/// <summary>消息服务客户端</summary>
|
public class TmcClient
|
{
|
// sign parameters
|
private const string GROUP_NAME = "group_name";
|
private const string SDK = "sdk";
|
private const string INTRANET_IP = "intranet_ip";
|
|
private TmcClientIdentity _id;
|
private string _appSecret;
|
private string _uri;
|
private Endpoint _endpoint;
|
private EndpointProxy _serverProxy;
|
|
private volatile bool running;
|
|
private int _heartbeatInterval = 45000; // 心跳频率(单位:毫秒)
|
private int _reconnectIntervalSeconds = 15; // 重连周期(单位:秒)
|
private int _pullRequestIntervalSeconds = 30; // 定时获取消息周期(单位:秒)
|
private Timer _reconnectTimer;
|
private Timer _pullRequestTimer;
|
|
public event EventHandler<MessageArgs> OnMessage;
|
|
/// <summary>获取或设置Log</summary>
|
public ITopLogger Log { get; set; }
|
|
private bool enableTraceLog = true;
|
|
public bool EnableTraceLog
|
{
|
get { return this.enableTraceLog;}
|
set { this.enableTraceLog = value; }
|
}
|
|
/// <summary>获取或设置定时发送拉取请求的周期(单位:秒)</summary>
|
public int PullRequestIntervalSeconds
|
{
|
get { return this._pullRequestIntervalSeconds; }
|
set
|
{
|
this._pullRequestIntervalSeconds = value;
|
if (this._pullRequestTimer != null)
|
this._pullRequestTimer.Change(TimeSpan.FromSeconds(this._pullRequestIntervalSeconds)
|
, TimeSpan.FromSeconds(this._pullRequestIntervalSeconds));
|
}
|
}
|
|
/// <summary>获取或设置自动重连间隔(单位:秒)</summary>
|
public int ReconnectIntervalSeconds
|
{
|
get { return this._reconnectIntervalSeconds; }
|
set
|
{
|
this._reconnectIntervalSeconds = value;
|
if (this._reconnectTimer != null)
|
this._reconnectTimer.Change(TimeSpan.FromSeconds(this._reconnectIntervalSeconds)
|
, TimeSpan.FromSeconds(this._reconnectIntervalSeconds));
|
}
|
}
|
|
/// <summary>以默认分组,初始化TMC客户端</summary>
|
public TmcClient(string appKey, string appSecret) : this(appKey, appSecret, "default") { }
|
|
/// <summary>初始化TMC客户端</summary>
|
public TmcClient(string appKey, string appSecret, string groupName)
|
{
|
this._appSecret = appSecret;
|
this._id = new TmcClientIdentity(appKey, groupName);
|
this.PrepareEndpoint();
|
}
|
|
/// <summary>连接到 TMC Server</summary>
|
/// <param name="uri">TMC server address, eg: ws://mc.api.taobao.com/</param>
|
public void Connect(string uri)
|
{
|
this.running = true;
|
doConnect(uri);
|
this.StartReconnect();
|
this.StartPullRequest();
|
}
|
|
private void doConnect(string uri)
|
{
|
var signHeader = new Dictionary<string, string>();
|
var connHeader = new Dictionary<string, object>();
|
signHeader.Add(Constants.APP_KEY, this._id.AppKey);
|
connHeader.Add(Constants.APP_KEY, signHeader[Constants.APP_KEY]);
|
|
signHeader.Add(GROUP_NAME, this._id.GroupName);
|
connHeader.Add(GROUP_NAME, signHeader[GROUP_NAME]);
|
|
signHeader.Add(Constants.TIMESTAMP, DateTime.Now.Ticks.ToString());
|
connHeader.Add(Constants.TIMESTAMP, signHeader[Constants.TIMESTAMP]);
|
|
connHeader.Add(Constants.SIGN, TopUtils.SignTopRequest(signHeader, this._appSecret, Constants.SIGN_METHOD_MD5));
|
//extra fields
|
connHeader.Add(SDK, Constants.SDK_VERSION);
|
connHeader.Add(INTRANET_IP, TopUtils.GetIntranetIp());
|
this._serverProxy = this._endpoint.GetEndpoint(new TmcServerIdentity(), uri, connHeader);
|
this._uri = uri;
|
this.Log.Info("connected to tmc server: {0}", uri);
|
}
|
|
/// <summary>向指定的主题发布一条与用户无关的消息。</summary>
|
/// <param name="topic">主题名称</param>
|
/// <param name="content">严格根据主题定义的消息内容(JSON/XML)</param>
|
public void Send(string topic, string content)
|
{
|
if (string.IsNullOrEmpty(topic))
|
throw new ArgumentNullException("topic");
|
if (string.IsNullOrEmpty(content))
|
throw new ArgumentNullException("content");
|
|
IDictionary<string, object> msg = new Dictionary<string, object>();
|
msg.Add(MessageFields.KIND, MessageKind.Data);
|
msg.Add(MessageFields.DATA_TOPIC, topic);
|
msg.Add(MessageFields.DATA_CONTENT, content);
|
this._serverProxy.SendAndWait(msg, 2000);
|
}
|
|
/// <summary>向指定的主题发布一条与用户相关的消息。</summary>
|
/// <param name="topic">主题名称</param>
|
/// <param name="content">严格根据主题定义的消息内容(JSON/XML)</param>
|
/// <param name="session">用户授权码</param>
|
public void Send(string topic, string content, string session)
|
{
|
if (string.IsNullOrEmpty(topic))
|
throw new ArgumentNullException("topic");
|
if (string.IsNullOrEmpty(content))
|
throw new ArgumentNullException("content");
|
if (string.IsNullOrEmpty(session))
|
throw new ArgumentNullException("session");
|
|
IDictionary<string, object> msg = new Dictionary<string, object>();
|
msg.Add(MessageFields.KIND, MessageKind.Data);
|
msg.Add(MessageFields.DATA_TOPIC, topic);
|
msg.Add(MessageFields.DATA_CONTENT, content);
|
msg.Add(MessageFields.DATA_INCOMING_USER_SESSION, session);
|
this._serverProxy.SendAndWait(msg, 2000);
|
}
|
|
/// <summary>向服务端发送消息拉取请求</summary>
|
protected internal void PullRequest()
|
{
|
IDictionary<string, object> msg = new Dictionary<string, object>();
|
msg.Add(MessageFields.KIND, MessageKind.PullRequest);
|
this._serverProxy.Send(msg);
|
}
|
|
/// <summary>确认消息</summary>
|
protected internal void Confirm(long id)
|
{
|
IDictionary<string, object> msg = new Dictionary<string, object>();
|
msg.Add(MessageFields.KIND, MessageKind.Confirm);
|
msg.Add(MessageFields.CONFIRM_ID, id);
|
this._serverProxy.Send(msg);
|
}
|
|
/// <summary>确认消息</summary>
|
protected internal void Fail(long id,string errorMsg)
|
{
|
IDictionary<string, object> msg = new Dictionary<string, object>();
|
msg.Add(MessageFields.KIND, MessageKind.Failed);
|
msg.Add(MessageFields.CONFIRM_ID, id);
|
msg.Add(MessageFields.CONFIRM_MSG, errorMsg);
|
|
this._serverProxy.Send(msg);
|
}
|
|
private void PrepareEndpoint()
|
{
|
this.Log = Top.Api.Log.Instance;
|
this._endpoint = new Endpoint(Log, this._id);
|
this._endpoint.ChannelSelector = new ClientChannelSharedSelector(Log) { HeartbeatPeriod = this._heartbeatInterval };
|
this._endpoint.OnMessage += new EventHandler<EndpointContext>(InternalOnMessage);
|
this._endpoint.OnAckMessage += new EventHandler<AckMessageArgs>(InternalOnAckMessage);
|
}
|
|
private void InternalOnMessage(object sender, EndpointContext context)
|
{
|
if (enableTraceLog)
|
{
|
this.Log.Info("messsage from {0}: {1}", context.MessageFrom, this.Dump(context.Message));
|
}
|
|
if (this.OnMessage == null)
|
return;
|
|
ThreadPool.QueueUserWorkItem(o =>
|
{
|
if (!this.running)
|
{
|
this.Log.Info(string.Format("message dropped as client closed: {0}", this.Dump(context.Message)));
|
return;
|
}
|
|
Message msg = this.ParseMessage(context.Message);
|
var args = new MessageArgs(msg, m => this.Confirm(m.Id));
|
var sw = new Stopwatch();
|
try
|
{
|
sw.Start();
|
this.OnMessage(this, args);
|
sw.Stop();
|
}
|
catch (Exception e)
|
{
|
args.Fail(e.Message);
|
}
|
|
if (args._isFail)
|
{
|
this.Log.Info("process message error: {0}", args._reason);
|
this.Fail(msg.Id,args._reason.Length > 128 ? args._reason.Substring(0,128) : args._reason);
|
return;
|
}
|
|
// prevent confirm attach
|
if (sw.ElapsedMilliseconds <= 1)
|
{
|
Thread.Sleep(10);
|
}
|
|
if (args._isConfirmed){
|
return;
|
}
|
|
try
|
{
|
this.Confirm(msg.Id);
|
|
if (enableTraceLog)
|
{
|
this.Log.Info("confirm message topic: {0}, dataid: {1}", msg.Topic, msg.Dataid);
|
}
|
}
|
catch (Exception e)
|
{
|
this.Log.Warn(string.Format("confirm message {0} error {1}", this.Dump(context.Message), e.StackTrace));
|
}
|
});
|
}
|
|
private void InternalOnAckMessage(object sender, AckMessageArgs e)
|
{
|
if (this.Log.IsDebugEnabled())
|
this.Log.Debug("ack messsage from {0}: {1}", e.MessageFrom, e.Message);
|
}
|
|
private void StartReconnect()
|
{
|
if (this._reconnectTimer != null) return;
|
this._reconnectTimer = new Timer(o =>
|
{
|
try
|
{
|
if (!this._serverProxy.hasValidSender())
|
{
|
this.Log.Info("reconning...");
|
this.doConnect(this._uri);
|
}
|
}
|
catch (Exception e)
|
{
|
this.Log.Warn("reconnect error", e);
|
}
|
}, null
|
, TimeSpan.FromSeconds(this._reconnectIntervalSeconds)
|
, TimeSpan.FromSeconds(this._reconnectIntervalSeconds));
|
}
|
|
private void StartPullRequest()
|
{
|
if (this._pullRequestTimer != null) return;
|
this._pullRequestTimer = new Timer(o =>
|
{
|
try
|
{
|
if (this._serverProxy.hasValidSender())
|
{
|
this.PullRequest();
|
}
|
}
|
catch (Exception e)
|
{
|
this.Log.Warn("pull request error", e);
|
}
|
}
|
, null
|
, TimeSpan.FromMilliseconds(500)
|
, TimeSpan.FromSeconds(this.PullRequestIntervalSeconds));
|
}
|
|
private Message ParseMessage(IDictionary<string, object> raw)
|
{
|
var msg = new Message();
|
msg.Id = this.GetValue<long>(raw, MessageFields.OUTGOING_ID);
|
msg.Topic = this.GetValue<string>(raw, MessageFields.DATA_TOPIC);
|
msg.PubAppKey = this.GetValue<string>(raw, MessageFields.DATA_OUTGOING_PUBLISHER);
|
msg.PubTime = this.GetValue<DateTime>(raw, MessageFields.DATA_PUBLISH_TIME);
|
msg.UserId = this.GetValue<long>(raw, MessageFields.DATA_OUTGOING_USER_ID);
|
msg.UserNick = this.GetValue<string>(raw, MessageFields.DATA_OUTGOING_USER_NICK);
|
msg.OutgoingTime = this.GetValue<DateTime>(raw, MessageFields.DATA_ATTACH_OUTGOING_TIME);
|
msg.Dataid = this.GetValue<object>(raw, MessageFields.DATA_DATAID);
|
|
if (!raw.ContainsKey(MessageFields.DATA_CONTENT))
|
return msg;
|
msg.Content = raw[MessageFields.DATA_CONTENT] is byte[]
|
? Encoding.UTF8.GetString(GZIPHelper.Unzip(raw[MessageFields.DATA_CONTENT] as byte[]))
|
: (string)raw[MessageFields.DATA_CONTENT];
|
|
return msg;
|
}
|
|
private T GetValue<T>(IDictionary<string, object> raw, string key)
|
{
|
if (raw.ContainsKey(key))
|
{
|
object value = raw[key];
|
if (value != null)
|
{
|
return (T)value;
|
}
|
}
|
return default(T);
|
}
|
|
private string Dump(IDictionary<string, object> raw)
|
{
|
var buf = new StringBuilder();
|
foreach (var i in raw)
|
buf.AppendFormat("{0}={1}|", i.Key, i.Value);
|
return buf.ToString();
|
}
|
|
public void Close()
|
{
|
this.running = false;
|
if (this._pullRequestTimer != null)
|
{
|
this._pullRequestTimer.Dispose();
|
this._pullRequestTimer = null;
|
}
|
if (this._reconnectTimer != null)
|
{
|
this._reconnectTimer.Dispose();
|
this._reconnectTimer = null;
|
}
|
this._serverProxy.Close(this._uri, "client closed");
|
this.Log.Warn("tmc client closed");
|
}
|
|
public bool Online
|
{
|
get
|
{
|
return this._serverProxy != null && this._serverProxy.hasValidSender();
|
}
|
}
|
}
|
}
|