using Fleck; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.Specialized; using System.Data; using System.Web; using WebAPI; namespace WebApiWithFleck { /// /// Fleck WebSocket 服务封装 /// public static class WebSocketServer { private static List _connections = new List(); private static ConcurrentDictionary _NameConnectionDict = new ConcurrentDictionary(); private static Fleck.WebSocketServer _server; // 内置定时器(用于定时查询数据库并推送) private static System.Timers.Timer _pushTimer; // 300000 private static readonly int _pushInterval = 10000; /// /// 启动 WebSocket 服务(监听 18080 端口) /// public static void Start() { // 配置 WebSocket 监听地址(独立端口 8089) var wsUrl = "ws://0.0.0.0:8089/ws"; // 0.0.0.0 允许外部访问 _server = new Fleck.WebSocketServer(wsUrl); _pushTimer = new System.Timers.Timer(_pushInterval); _pushTimer.AutoReset = false; // 非自动重置,避免并发 _pushTimer.Elapsed += OnTimerElapsed; // 绑定静态事件 // 配置 Fleck 服务 _server.Start(connection => { _pushTimer.Start(); // 客户端连接建立时 connection.OnOpen = () => { string userId = ""; string userName = ""; // 获取参数 string path = connection.ConnectionInfo.Path; Uri dummyUri = new Uri($"http://localhost{path}"); var queryParams = HttpUtility.ParseQueryString(dummyUri.Query); userId = queryParams["userId"]; userName = queryParams["userName"]; LogService.Write($"WebSocket 客户端连接:{userId}_{userName}-{connection.ConnectionInfo.ClientIpAddress}"); _connections.Add(connection); // 保存连接 _NameConnectionDict[connection] = $@"{userId}_{userName}"; }; // 收到客户端消息时 connection.OnMessage = message => { Console.WriteLine($"收到消息:{message}"); // TODO 心跳信号 }; // 客户端断开连接时 connection.OnClose = () => { LogService.Write($"WebSocket 客户端断开:{connection.ConnectionInfo.ClientIpAddress}"); string success = ""; // 移除连接 _NameConnectionDict.TryRemove(connection,out success); // _connections.Remove(connection); }; // 连接出错时 connection.OnError = ex => { string success = ""; LogService.Write($"WebSocket 错误:{ex.Message}"); _NameConnectionDict.TryRemove(connection, out success); }; }); LogService.Write($"Fleck WebSocket 服务已启动,监听:{wsUrl}"); } /// /// 停止 WebSocket 服务 /// public static void Stop() { _server?.Dispose(); _pushTimer?.Stop(); Console.WriteLine("Fleck WebSocket 服务已停止"); } /// /// 静态定时器触发事件 /// private static void OnTimerElapsed(object sender, System.Timers.ElapsedEventArgs e) { SQLHelper.ClsCN oCN = new SQLHelper.ClsCN(); DataSet ds = new DataSet(); try { JObject NameMsgKVP = new JObject(); // 获取数据库中未被读取的数据 ds = oCN.RunProcReturn("select * from h_v_OA_WorkLinkBillAllList where 单据状态 = '已审核' and 阅读状态 = '未阅'", "h_v_OA_WorkLinkBillAllList"); // 判断广播还是私发 ,统计数据 JArray dataTableJArray = JArray.Parse(JsonConvert.SerializeObject(ds.Tables[0])); foreach (JToken row in dataTableJArray) { // 提前获取并处理行数据(避免重复访问 row["列名"],且处理 null) string receiver = row["接收人"]?.ToString() ?? string.Empty; // 为 null 时返回空字符串 string ccUsers = row["所有抄送人"]?.ToString() ?? string.Empty; string sendType = row["发送类型"]?.ToString() ?? string.Empty; // 跳过无效数据行(三列均为空时,无需处理) if (string.IsNullOrEmpty(receiver) && string.IsNullOrEmpty(ccUsers) && string.IsNullOrEmpty(sendType)) continue; foreach (var dictOne in _NameConnectionDict) { string UName = dictOne.Value.Split('_')[1]; // 将接收人 抄送人 获取 广播的消息添加到消息缓存 if (string.Equals(row["接收人"].ToString(), UName) || row["所有抄送人"].ToString().Contains(UName) || string.Equals(row["发送类型"].ToString(), "公共")) { string dictOneVal = dictOne.Value.ToString(); if (NameMsgKVP[dictOneVal] == null) { NameMsgKVP[dictOneVal] = new JArray(); } var jArray = NameMsgKVP[dictOneVal] as JArray; jArray.Add(row); } } } // 根据用户ID分发到不同的客户端 foreach (var dictOne in _NameConnectionDict) { if(NameMsgKVP[dictOne.Value.ToString()] != null) { // 封装 websocket 信息 dictOne.Key.Send(new JObject{ ["Type"]="Message", ["Content"]= NameMsgKVP[dictOne.Value.ToString()].ToString() }.ToString()); } } LogService.Write("消息推送完成..."); } catch (Exception ex) { LogService.Write(ex.ToString()); return; } finally { // 静态定时器手动重启 if (_pushTimer != null && !_pushTimer.Enabled) { _pushTimer.Start(); } } } } }