using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using Taobao.Top.Link.Channel;
using Taobao.Top.Link.Endpoints;
using Taobao.Top.Link.Util;
using Top.Api;
using Top.Api.Util;
namespace Top.Tmc
{
/// 消息服务客户端
public class TmcClient
{
// sign parameters
private const string GROUP_NAME = "group_name";
private const string SDK = "sdk";
private const string INTRANET_IP = "intranet_ip";
private TmcClientIdentity _id;
private string _appSecret;
private string _uri;
private Endpoint _endpoint;
private EndpointProxy _serverProxy;
private volatile bool running;
private int _heartbeatInterval = 45000; // 心跳频率(单位:毫秒)
private int _reconnectIntervalSeconds = 15; // 重连周期(单位:秒)
private int _pullRequestIntervalSeconds = 30; // 定时获取消息周期(单位:秒)
private Timer _reconnectTimer;
private Timer _pullRequestTimer;
public event EventHandler OnMessage;
/// 获取或设置Log
public ITopLogger Log { get; set; }
private bool enableTraceLog = true;
public bool EnableTraceLog
{
get { return this.enableTraceLog;}
set { this.enableTraceLog = value; }
}
/// 获取或设置定时发送拉取请求的周期(单位:秒)
public int PullRequestIntervalSeconds
{
get { return this._pullRequestIntervalSeconds; }
set
{
this._pullRequestIntervalSeconds = value;
if (this._pullRequestTimer != null)
this._pullRequestTimer.Change(TimeSpan.FromSeconds(this._pullRequestIntervalSeconds)
, TimeSpan.FromSeconds(this._pullRequestIntervalSeconds));
}
}
/// 获取或设置自动重连间隔(单位:秒)
public int ReconnectIntervalSeconds
{
get { return this._reconnectIntervalSeconds; }
set
{
this._reconnectIntervalSeconds = value;
if (this._reconnectTimer != null)
this._reconnectTimer.Change(TimeSpan.FromSeconds(this._reconnectIntervalSeconds)
, TimeSpan.FromSeconds(this._reconnectIntervalSeconds));
}
}
/// 以默认分组,初始化TMC客户端
public TmcClient(string appKey, string appSecret) : this(appKey, appSecret, "default") { }
/// 初始化TMC客户端
public TmcClient(string appKey, string appSecret, string groupName)
{
this._appSecret = appSecret;
this._id = new TmcClientIdentity(appKey, groupName);
this.PrepareEndpoint();
}
/// 连接到 TMC Server
/// TMC server address, eg: ws://mc.api.taobao.com/
public void Connect(string uri)
{
this.running = true;
doConnect(uri);
this.StartReconnect();
this.StartPullRequest();
}
private void doConnect(string uri)
{
var signHeader = new Dictionary();
var connHeader = new Dictionary();
signHeader.Add(Constants.APP_KEY, this._id.AppKey);
connHeader.Add(Constants.APP_KEY, signHeader[Constants.APP_KEY]);
signHeader.Add(GROUP_NAME, this._id.GroupName);
connHeader.Add(GROUP_NAME, signHeader[GROUP_NAME]);
signHeader.Add(Constants.TIMESTAMP, DateTime.Now.Ticks.ToString());
connHeader.Add(Constants.TIMESTAMP, signHeader[Constants.TIMESTAMP]);
connHeader.Add(Constants.SIGN, TopUtils.SignTopRequest(signHeader, this._appSecret, Constants.SIGN_METHOD_MD5));
//extra fields
connHeader.Add(SDK, Constants.SDK_VERSION);
connHeader.Add(INTRANET_IP, TopUtils.GetIntranetIp());
this._serverProxy = this._endpoint.GetEndpoint(new TmcServerIdentity(), uri, connHeader);
this._uri = uri;
this.Log.Info("connected to tmc server: {0}", uri);
}
/// 向指定的主题发布一条与用户无关的消息。
/// 主题名称
/// 严格根据主题定义的消息内容(JSON/XML)
public void Send(string topic, string content)
{
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException("topic");
if (string.IsNullOrEmpty(content))
throw new ArgumentNullException("content");
IDictionary msg = new Dictionary();
msg.Add(MessageFields.KIND, MessageKind.Data);
msg.Add(MessageFields.DATA_TOPIC, topic);
msg.Add(MessageFields.DATA_CONTENT, content);
this._serverProxy.SendAndWait(msg, 2000);
}
/// 向指定的主题发布一条与用户相关的消息。
/// 主题名称
/// 严格根据主题定义的消息内容(JSON/XML)
/// 用户授权码
public void Send(string topic, string content, string session)
{
if (string.IsNullOrEmpty(topic))
throw new ArgumentNullException("topic");
if (string.IsNullOrEmpty(content))
throw new ArgumentNullException("content");
if (string.IsNullOrEmpty(session))
throw new ArgumentNullException("session");
IDictionary msg = new Dictionary();
msg.Add(MessageFields.KIND, MessageKind.Data);
msg.Add(MessageFields.DATA_TOPIC, topic);
msg.Add(MessageFields.DATA_CONTENT, content);
msg.Add(MessageFields.DATA_INCOMING_USER_SESSION, session);
this._serverProxy.SendAndWait(msg, 2000);
}
/// 向服务端发送消息拉取请求
protected internal void PullRequest()
{
IDictionary msg = new Dictionary();
msg.Add(MessageFields.KIND, MessageKind.PullRequest);
this._serverProxy.Send(msg);
}
/// 确认消息
protected internal void Confirm(long id)
{
IDictionary msg = new Dictionary();
msg.Add(MessageFields.KIND, MessageKind.Confirm);
msg.Add(MessageFields.CONFIRM_ID, id);
this._serverProxy.Send(msg);
}
/// 确认消息
protected internal void Fail(long id,string errorMsg)
{
IDictionary msg = new Dictionary();
msg.Add(MessageFields.KIND, MessageKind.Failed);
msg.Add(MessageFields.CONFIRM_ID, id);
msg.Add(MessageFields.CONFIRM_MSG, errorMsg);
this._serverProxy.Send(msg);
}
private void PrepareEndpoint()
{
this.Log = Top.Api.Log.Instance;
this._endpoint = new Endpoint(Log, this._id);
this._endpoint.ChannelSelector = new ClientChannelSharedSelector(Log) { HeartbeatPeriod = this._heartbeatInterval };
this._endpoint.OnMessage += new EventHandler(InternalOnMessage);
this._endpoint.OnAckMessage += new EventHandler(InternalOnAckMessage);
}
private void InternalOnMessage(object sender, EndpointContext context)
{
if (enableTraceLog)
{
this.Log.Info("messsage from {0}: {1}", context.MessageFrom, this.Dump(context.Message));
}
if (this.OnMessage == null)
return;
ThreadPool.QueueUserWorkItem(o =>
{
if (!this.running)
{
this.Log.Info(string.Format("message dropped as client closed: {0}", this.Dump(context.Message)));
return;
}
Message msg = this.ParseMessage(context.Message);
var args = new MessageArgs(msg, m => this.Confirm(m.Id));
var sw = new Stopwatch();
try
{
sw.Start();
this.OnMessage(this, args);
sw.Stop();
}
catch (Exception e)
{
args.Fail(e.Message);
}
if (args._isFail)
{
this.Log.Info("process message error: {0}", args._reason);
this.Fail(msg.Id,args._reason.Length > 128 ? args._reason.Substring(0,128) : args._reason);
return;
}
// prevent confirm attach
if (sw.ElapsedMilliseconds <= 1)
{
Thread.Sleep(10);
}
if (args._isConfirmed){
return;
}
try
{
this.Confirm(msg.Id);
if (enableTraceLog)
{
this.Log.Info("confirm message topic: {0}, dataid: {1}", msg.Topic, msg.Dataid);
}
}
catch (Exception e)
{
this.Log.Warn(string.Format("confirm message {0} error {1}", this.Dump(context.Message), e.StackTrace));
}
});
}
private void InternalOnAckMessage(object sender, AckMessageArgs e)
{
if (this.Log.IsDebugEnabled())
this.Log.Debug("ack messsage from {0}: {1}", e.MessageFrom, e.Message);
}
private void StartReconnect()
{
if (this._reconnectTimer != null) return;
this._reconnectTimer = new Timer(o =>
{
try
{
if (!this._serverProxy.hasValidSender())
{
this.Log.Info("reconning...");
this.doConnect(this._uri);
}
}
catch (Exception e)
{
this.Log.Warn("reconnect error", e);
}
}, null
, TimeSpan.FromSeconds(this._reconnectIntervalSeconds)
, TimeSpan.FromSeconds(this._reconnectIntervalSeconds));
}
private void StartPullRequest()
{
if (this._pullRequestTimer != null) return;
this._pullRequestTimer = new Timer(o =>
{
try
{
if (this._serverProxy.hasValidSender())
{
this.PullRequest();
}
}
catch (Exception e)
{
this.Log.Warn("pull request error", e);
}
}
, null
, TimeSpan.FromMilliseconds(500)
, TimeSpan.FromSeconds(this.PullRequestIntervalSeconds));
}
private Message ParseMessage(IDictionary raw)
{
var msg = new Message();
msg.Id = this.GetValue(raw, MessageFields.OUTGOING_ID);
msg.Topic = this.GetValue(raw, MessageFields.DATA_TOPIC);
msg.PubAppKey = this.GetValue(raw, MessageFields.DATA_OUTGOING_PUBLISHER);
msg.PubTime = this.GetValue(raw, MessageFields.DATA_PUBLISH_TIME);
msg.UserId = this.GetValue(raw, MessageFields.DATA_OUTGOING_USER_ID);
msg.UserNick = this.GetValue(raw, MessageFields.DATA_OUTGOING_USER_NICK);
msg.OutgoingTime = this.GetValue(raw, MessageFields.DATA_ATTACH_OUTGOING_TIME);
msg.Dataid = this.GetValue