using System;
|
using System.Collections.Generic;
|
using System.IO;
|
using System.Text;
|
using Taobao.Top.Link.Channel;
|
using Top.Api;
|
|
namespace Taobao.Top.Link.Endpoints
|
{
|
/// <summary>deal with protocol/callback/send
|
/// </summary>
|
public class EndpointHandler
|
{
|
private ITopLogger _log;
|
private int _flag;
|
//all connect in/out endpoints
|
private IDictionary<string, Identity> _idByToken;
|
private IDictionary<int, SendCallback> _callbacks;
|
private EventHandler<ChannelContext> _onMessage;
|
|
public Action<EndpointContext> MessageHandler { get; set; }
|
public OnAckMessage AckMessageHandler { get; set; }
|
|
public EndpointHandler(ITopLogger logger)
|
{
|
this._log = logger;
|
this._idByToken = new Dictionary<string, Identity>();
|
this._callbacks = new Dictionary<int, SendCallback>();
|
this._onMessage = new EventHandler<ChannelContext>(this.OnMessage);
|
}
|
|
public void Send(Message message, IChannelSender sender)
|
{
|
this.Send(message, sender, null);
|
}
|
public void Send(Message message, IChannelSender sender, SendCallback callback)
|
{
|
if (callback != null)
|
{
|
message.Flag = System.Threading.Interlocked.Increment(ref this._flag);
|
this._callbacks.Add(message.Flag, callback);
|
}
|
using (var s = new MemoryStream())
|
{
|
MessageIO.WriteMessage(s, message);
|
this.GetChannel(sender).Send(s.ToArray());
|
}
|
}
|
|
internal IDictionary<string, object> SendAndWait(EndpointProxy e
|
, IChannelSender sender
|
, Message message
|
, int timeout)
|
{
|
SendCallback callback = new SendCallback(e);
|
this.Send(message, sender, callback);
|
callback.WaitReturn(timeout);
|
if (callback.Error != null)
|
throw callback.Error;
|
return callback.Return;
|
}
|
|
private IClientChannel GetChannel(IChannelSender sender)
|
{
|
var channel = sender as IClientChannel;
|
if (channel.OnMessage == null)
|
channel.OnMessage = this._onMessage;
|
return channel;
|
}
|
private void OnMessage(object sender, ChannelContext ctx)
|
{
|
Message msg = MessageIO.ReadMessage(new MemoryStream((byte[])ctx.Message));
|
SendCallback callback = this._callbacks.ContainsKey(msg.Flag)
|
? this._callbacks[msg.Flag]
|
: null;
|
|
if (msg.MessageType == MessageType.CONNECTACK)
|
{
|
this.HandleConnectAck(callback, msg);
|
return;
|
}
|
|
Identity msgFrom = msg.Token != null && this._idByToken.ContainsKey(msg.Token)
|
? this._idByToken[msg.Token]
|
: null;
|
// must CONNECT/CONNECTACK for got token before SEND
|
if (msgFrom == null)
|
{
|
var error = new LinkException(Text.E_UNKNOWN_MSG_FROM);
|
if (callback == null)
|
throw error;
|
callback.Error = error;
|
return;
|
}
|
|
#region raise callback of client
|
if (callback != null)
|
{
|
this.HandleCallback(callback, msg, msgFrom);
|
return;
|
}
|
else if (this.IsError(msg))
|
{
|
this._log.Error(Text.E_GOT_ERROR, msg.StatusCode, msg.StatusPhase);
|
return;
|
}
|
#endregion
|
|
#region raise event
|
if (msg.MessageType == MessageType.SENDACK)
|
{
|
if (this.AckMessageHandler != null)
|
this.AckMessageHandler(msg.Content, msgFrom);
|
return;
|
}
|
|
if (this.MessageHandler == null)
|
return;
|
EndpointContext endpointContext = new EndpointContext(ctx, this, msgFrom, msg.Flag, msg.Token);
|
endpointContext.Message = msg.Content;
|
try
|
{
|
this.MessageHandler(endpointContext);
|
}
|
catch (Exception e)
|
{
|
// onMessage error should be reply to client
|
if (e is LinkException)
|
endpointContext.Error(
|
((LinkException)e).ErrorCode,
|
((LinkException)e).Message);
|
else
|
endpointContext.Error(0, e.Message);
|
}
|
#endregion
|
}
|
private void HandleConnectAck(SendCallback callback, Message msg)
|
{
|
if (callback == null)
|
throw new LinkException(Text.E_NO_CALLBACK);
|
if (this.IsError(msg))
|
callback.Error = new LinkException(msg.StatusCode, msg.StatusPhase);
|
else
|
{
|
callback.Return = null;
|
// set token for proxy for sending message next time
|
callback.Target.Token = msg.Token;
|
// store token from target endpoint for receiving it's message
|
// next time
|
if (this._idByToken.ContainsKey(msg.Token))
|
this._idByToken[msg.Token] = callback.Target.Identity;
|
else
|
this._idByToken.Add(msg.Token, callback.Target.Identity);
|
|
this._log.Info(Text.E_CONNECT_SUCCESS, callback.Target.Identity, msg.Token);
|
}
|
}
|
private void HandleCallback(SendCallback callback, Message msg, Identity msgFrom)
|
{
|
if (!callback.Target.Identity.Equals(msgFrom))
|
{
|
this._log.Warn(Text.E_IDENTITY_NOT_MATCH_WITH_CALLBACK, msgFrom, callback.Target.Identity);
|
return;
|
}
|
if (this.IsError(msg))
|
callback.Error = new LinkException(msg.StatusCode, msg.StatusPhase);
|
else
|
callback.Return = msg.Content;
|
}
|
private bool IsError(Message msg)
|
{
|
return msg.StatusCode > 0 || !string.IsNullOrEmpty(msg.StatusPhase);
|
}
|
|
public delegate void OnAckMessage(IDictionary<string, object> message, Identity messageFrom);
|
}
|
}
|