using System; using System.Collections.Generic; using System.Runtime.CompilerServices; using Taobao.Top.Link.Channel; using Top.Api; namespace Taobao.Top.Link.Endpoints { // Abstract network model // https://docs.google.com/drawings/d/1PRfzMVNGE4NKkpD9A_-QlH2PV47MFumZX8LbCwhzpQg/edit public sealed class Endpoint { internal static int TIMEOUT = 10000; // connect timeout in 10 seconds private ITopLogger _log; // in/out endpoints private IList _connected; private EndpointHandler _handler; /// get or set clientChannelSelector /// public IClientChannelSelector ChannelSelector { get; set; } /// message received, see RTT based /// public event EventHandler OnMessage; /// ack message received, see RTT based /// public event EventHandler OnAckMessage; /// get id /// public Identity Identity { get; private set; } public Endpoint(Identity identity) : this(Log.Instance, identity) { } public Endpoint(ITopLogger logger, Identity identity) { this._connected = new List(); this._log = logger; this.Identity = identity; this.ChannelSelector = new ClientChannelSharedSelector(logger); this._handler = new EndpointHandler(logger); this._handler.MessageHandler = ctx => OnMessage(this, ctx); this._handler.AckMessageHandler = (m, i) => OnAckMessage(this, new AckMessageArgs(m, i)); } /// get connected endpoint by id /// /// /// [MethodImplAttribute(MethodImplOptions.Synchronized)] public EndpointProxy GetEndpoint(Identity target) { if (target.Equals(this.Identity)) throw new LinkException(Text.E_ID_DUPLICATE); foreach (EndpointProxy p in this._connected) if (p.Identity != null && p.Identity.Equals(target)) return p; return null; } /// connect endpoint /// /// target id /// target address /// public EndpointProxy GetEndpoint(Identity target, string uri) { return this.GetEndpoint(target, uri, null); } /// connect endpoint /// /// target id /// target address /// passed as connect message /// [MethodImplAttribute(MethodImplOptions.Synchronized)] public EndpointProxy GetEndpoint(Identity target, string uri, IDictionary extras) { EndpointProxy e = this.GetEndpoint(target) ?? this.CreateProxy(); e.Identity = target; // always clear, cached proxy will have broken channel e.Remove(uri); // always reget channel, make sure it's valid IClientChannel channel = this.ChannelSelector.GetChannel(new Uri(uri)); // connect message Message msg = new Message(); msg.MessageType = MessageType.CONNECT; IDictionary content = new Dictionary(); this.Identity.Render(content); // pass extra data if (extras != null) foreach (var p in extras) content.Add(p); msg.Content = content; this._handler.SendAndWait(e, channel, msg, TIMEOUT); e.Add(channel); return e; } private EndpointProxy CreateProxy() { EndpointProxy e = new EndpointProxy(this._handler); this._connected.Add(e); return e; } } }