using Fleck;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Data;
using System.Web;
using WebAPI;
namespace WebApiWithFleck
{
///
/// Fleck WebSocket 服务封装
///
public static class WebSocketServer
{
private static List _connections = new List();
private static ConcurrentDictionary _NameConnectionDict = new ConcurrentDictionary();
private static Fleck.WebSocketServer _server;
// 内置定时器(用于定时查询数据库并推送)
private static System.Timers.Timer _pushTimer;
// 300000
private static readonly int _pushInterval = 10000;
///
/// 启动 WebSocket 服务(监听 18080 端口)
///
public static void Start()
{
// 配置 WebSocket 监听地址(独立端口 8089)
var wsUrl = "ws://0.0.0.0:8089/ws"; // 0.0.0.0 允许外部访问
_server = new Fleck.WebSocketServer(wsUrl);
_pushTimer = new System.Timers.Timer(_pushInterval);
_pushTimer.AutoReset = false; // 非自动重置,避免并发
_pushTimer.Elapsed += OnTimerElapsed; // 绑定静态事件
// 配置 Fleck 服务
_server.Start(connection =>
{
_pushTimer.Start();
// 客户端连接建立时
connection.OnOpen = () =>
{
string userId = "";
string userName = "";
// 获取参数
string path = connection.ConnectionInfo.Path;
Uri dummyUri = new Uri($"http://localhost{path}");
var queryParams = HttpUtility.ParseQueryString(dummyUri.Query);
userId = queryParams["userId"];
userName = queryParams["userName"];
LogService.Write($"WebSocket 客户端连接:{userId}_{userName}-{connection.ConnectionInfo.ClientIpAddress}");
_connections.Add(connection); // 保存连接
_NameConnectionDict[connection] = $@"{userId}_{userName}";
};
// 收到客户端消息时
connection.OnMessage = message =>
{
Console.WriteLine($"收到消息:{message}");
// TODO 心跳信号
};
// 客户端断开连接时
connection.OnClose = () =>
{
LogService.Write($"WebSocket 客户端断开:{connection.ConnectionInfo.ClientIpAddress}");
string success = "";
// 移除连接
_NameConnectionDict.TryRemove(connection,out success);
// _connections.Remove(connection);
};
// 连接出错时
connection.OnError = ex =>
{
string success = "";
LogService.Write($"WebSocket 错误:{ex.Message}");
_NameConnectionDict.TryRemove(connection, out success);
};
});
LogService.Write($"Fleck WebSocket 服务已启动,监听:{wsUrl}");
}
///
/// 停止 WebSocket 服务
///
public static void Stop()
{
_server?.Dispose();
_pushTimer?.Stop();
Console.WriteLine("Fleck WebSocket 服务已停止");
}
///
/// 静态定时器触发事件
///
private static void OnTimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
{
SQLHelper.ClsCN oCN = new SQLHelper.ClsCN();
DataSet ds = new DataSet();
try
{
JObject NameMsgKVP = new JObject();
// 获取数据库中未被读取的数据
ds = oCN.RunProcReturn("select * from h_v_OA_WorkLinkBillAllList where 单据状态 = '已审核' and 阅读状态 = '未阅'", "h_v_OA_WorkLinkBillAllList");
// 判断广播还是私发 ,统计数据
JArray dataTableJArray = JArray.Parse(JsonConvert.SerializeObject(ds.Tables[0]));
foreach (JToken row in dataTableJArray)
{
// 提前获取并处理行数据(避免重复访问 row["列名"],且处理 null)
string receiver = row["接收人"]?.ToString() ?? string.Empty; // 为 null 时返回空字符串
string ccUsers = row["所有抄送人"]?.ToString() ?? string.Empty;
string sendType = row["发送类型"]?.ToString() ?? string.Empty;
// 跳过无效数据行(三列均为空时,无需处理)
if (string.IsNullOrEmpty(receiver) && string.IsNullOrEmpty(ccUsers) && string.IsNullOrEmpty(sendType))
continue;
foreach (var dictOne in _NameConnectionDict)
{
string UName = dictOne.Value.Split('_')[1];
// 将接收人 抄送人 获取 广播的消息添加到消息缓存
if (string.Equals(row["接收人"].ToString(), UName)
|| row["所有抄送人"].ToString().Contains(UName)
|| string.Equals(row["发送类型"].ToString(), "公共"))
{
string dictOneVal = dictOne.Value.ToString();
if (NameMsgKVP[dictOneVal] == null)
{
NameMsgKVP[dictOneVal] = new JArray();
}
var jArray = NameMsgKVP[dictOneVal] as JArray;
jArray.Add(row);
}
}
}
// 根据用户ID分发到不同的客户端
foreach (var dictOne in _NameConnectionDict)
{
if(NameMsgKVP[dictOne.Value.ToString()] != null)
{
// 封装 websocket 信息
dictOne.Key.Send(new JObject{
["Type"]="Message",
["Content"]= NameMsgKVP[dictOne.Value.ToString()].ToString()
}.ToString());
}
}
LogService.Write("消息推送完成...");
}
catch (Exception ex)
{
LogService.Write(ex.ToString());
return;
}
finally
{
// 静态定时器手动重启
if (_pushTimer != null && !_pushTimer.Enabled)
{
_pushTimer.Start();
}
}
}
}
}