1
llj
2 天以前 3e073e3d25f894fa43b800b6ee3c44a847203ddf
WebAPI/WebSocketServer.cs
New file
@@ -0,0 +1,212 @@
using Fleck;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Data;
using System.Web;
using WebAPI;
namespace WebApiWithFleck
{
    /// <summary>
    /// Fleck WebSocket 服务封装
    /// </summary>
    public static class WebSocketServer
    {
        private static List<IWebSocketConnection> _connections = new List<IWebSocketConnection>();
        // 套接字连接 与 用户标识符 字典
        private static ConcurrentDictionary<IWebSocketConnection, string> _NameConnectionDict = new ConcurrentDictionary<IWebSocketConnection, string>();
        private static Fleck.WebSocketServer _server;
        // 内置定时器(用于定时查询数据库并推送)
        private static System.Timers.Timer _pushTimer;
        // 内置定时器(用于发送心跳信号)
        private static System.Timers.Timer _pushTimerBeat;
        // 300000
        private static readonly int _pushInterval = 300000; // 每5分钟发送一次消息
        /// <summary>
        /// 启动 WebSocket 服务(监听 8089 端口)
        /// </summary>
        public static void Start()
        {
            // 配置 WebSocket 监听地址(独立端口 8089)
            var wsUrl = "ws://0.0.0.0:8089/ws"; // 0.0.0.0 允许外部访问
            _server = new Fleck.WebSocketServer(wsUrl);
            _pushTimer = new System.Timers.Timer(_pushInterval);
            _pushTimer.AutoReset = false; // 非自动重置,避免并发
            _pushTimer.Elapsed += OnTimerElapsed; // 绑定静态事件
            _pushTimerBeat = new System.Timers.Timer(30000);
            _pushTimerBeat.AutoReset = false; // 非自动重置,避免并发
            _pushTimerBeat.Elapsed += BeatSignalSender; // 绑定静态事件
            // 配置 Fleck 服务
            _server.Start(connection =>
            {
                _pushTimer.Start();
                // 客户端连接建立时
                connection.OnOpen = () =>
                {
                    string userId = "";
                    string userName = "";
                    // 获取参数
                    string path = connection.ConnectionInfo.Path;
                    Uri dummyUri = new Uri($"http://localhost{path}");
                    var queryParams = HttpUtility.ParseQueryString(dummyUri.Query);
                    userId = queryParams["userId"];
                    userName = queryParams["userName"];
                    LogService.Write($"WebSocket 客户端连接:{userId}_{userName}-{connection.ConnectionInfo.ClientIpAddress}");
                    _connections.Add(connection); // 保存连接
                    _NameConnectionDict[connection] =  $@"{userId}_{userName}";
                };
                // 收到客户端消息时
                connection.OnMessage = message =>
                {
                    Console.WriteLine($"收到消息:{message}");
                };
                // 客户端断开连接时
                connection.OnClose = () =>
                {
                    LogService.Write($"WebSocket 客户端断开:{connection.ConnectionInfo.ClientIpAddress}");
                    string success = "";
                    // 移除连接
                    _NameConnectionDict.TryRemove(connection,out success);
                    // _connections.Remove(connection);
                };
                // 连接出错时
                connection.OnError = ex =>
                {
                    string success = "";
                    LogService.Write($"WebSocket 错误:{ex.Message}");
                    _NameConnectionDict.TryRemove(connection, out success);
                };
            });
            LogService.Write($"Fleck WebSocket 服务已启动,监听:{wsUrl}");
        }
        /// <summary>
        /// 停止 WebSocket 服务
        /// </summary>
        public static void Stop()
        {
            _server?.Dispose();
            _pushTimer?.Stop();
            _pushTimerBeat?.Stop();
            Console.WriteLine("Fleck WebSocket 服务已停止");
        }
        /// <summary>
        /// 静态定时器触发事件 推送 未读消息
        /// </summary>
        private static void OnTimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            SQLHelper.ClsCN oCN = new SQLHelper.ClsCN();
            DataSet ds = new DataSet();
            try
            {
                JObject NameMsgKVP = new JObject();
                // 获取数据库中未被读取的数据
                ds = oCN.RunProcReturn("select * from h_v_OA_WorkLinkBillAllList where 单据状态 = '已审核' and 阅读状态 = '未阅'", "h_v_OA_WorkLinkBillAllList");
                // 判断广播还是私发 ,统计数据
                JArray dataTableJArray = JArray.Parse(JsonConvert.SerializeObject(ds.Tables[0]));
                foreach (JToken row in dataTableJArray)
                {
                    // 提前获取并处理行数据(避免重复访问 row["列名"],且处理 null)
                    string receiver = row["接收人"]?.ToString() ?? string.Empty; // 为 null 时返回空字符串
                    string ccUsers = row["所有抄送人"]?.ToString() ?? string.Empty;
                    string sendType = row["发送类型"]?.ToString() ?? string.Empty;
                    // 跳过无效数据行(三列均为空时,无需处理)
                    if (string.IsNullOrEmpty(receiver) && string.IsNullOrEmpty(ccUsers) && string.IsNullOrEmpty(sendType))
                        continue;
                    foreach (var dictOne in _NameConnectionDict)
                    {
                        string UName = dictOne.Value.Split('_')[1];
                        // 将接收人 抄送人 获取 广播的消息添加到消息缓存
                        if (string.Equals(row["接收人"].ToString(), UName)
                            || row["所有抄送人"].ToString().Contains(UName)
                            || string.Equals(row["发送类型"].ToString(), "公共"))
                        {
                            string dictOneVal = dictOne.Value.ToString();
                            if (NameMsgKVP[dictOneVal] == null)
                            {
                                NameMsgKVP[dictOneVal] = new JArray();
                            }
                            var jArray = NameMsgKVP[dictOneVal] as JArray;
                            jArray.Add(row);
                        }
                    }
                }
                // 根据用户ID分发到不同的客户端
                foreach (var dictOne in _NameConnectionDict)
                {
                    if(NameMsgKVP[dictOne.Value.ToString()] != null)
                    {
                        // 封装 websocket 信息
                        dictOne.Key.Send(new JObject{
                            ["Type"]="Message",
                            ["Content"]= NameMsgKVP[dictOne.Value.ToString()].ToString()
                        }.ToString());
                    }
                }
                LogService.Write("消息推送完成...");
            }
            catch (Exception ex)
            {
                LogService.Write(ex.ToString());
                return;
            }
            finally
            {
                // 静态定时器手动重启
                if (_pushTimer != null && !_pushTimer.Enabled)
                {
                    _pushTimer.Start();
                }
            }
        }
        // 定时器发送心跳信号
        private static void BeatSignalSender(object sender, System.Timers.ElapsedEventArgs e)
        {
            try
            {
                foreach (var dictOne in _NameConnectionDict)
                {
                    LogService.Write($@"服务端 向 {dictOne.Value} 发送心跳信号");
                    dictOne.Key.SendPing(new byte[0]);
                }
            }
            catch (Exception ex)
            {
                LogService.Write(ex.ToString());
                return;
            }
            finally
            {
                // 静态定时器手动重启
                if (_pushTimer != null && !_pushTimer.Enabled)
                {
                    _pushTimer.Start();
                }
            }
        }
    }
}