using Fleck;
|
using Newtonsoft.Json;
|
using Newtonsoft.Json.Linq;
|
using System;
|
using System.Collections.Concurrent;
|
using System.Collections.Generic;
|
using System.Collections.Specialized;
|
using System.Data;
|
using System.Web;
|
using WebAPI;
|
|
namespace WebApiWithFleck
|
{
|
/// <summary>
|
/// Fleck WebSocket 服务封装
|
/// </summary>
|
public static class WebSocketServer
|
{
|
private static List<IWebSocketConnection> _connections = new List<IWebSocketConnection>();
|
private static ConcurrentDictionary<IWebSocketConnection, string> _NameConnectionDict = new ConcurrentDictionary<IWebSocketConnection, string>();
|
private static Fleck.WebSocketServer _server;
|
// 内置定时器(用于定时查询数据库并推送)
|
private static System.Timers.Timer _pushTimer;
|
// 300000
|
private static readonly int _pushInterval = 10000;
|
|
/// <summary>
|
/// 启动 WebSocket 服务(监听 18080 端口)
|
/// </summary>
|
public static void Start()
|
{
|
// 配置 WebSocket 监听地址(独立端口 8089)
|
var wsUrl = "ws://0.0.0.0:8089/ws"; // 0.0.0.0 允许外部访问
|
_server = new Fleck.WebSocketServer(wsUrl);
|
|
_pushTimer = new System.Timers.Timer(_pushInterval);
|
_pushTimer.AutoReset = false; // 非自动重置,避免并发
|
_pushTimer.Elapsed += OnTimerElapsed; // 绑定静态事件
|
|
// 配置 Fleck 服务
|
_server.Start(connection =>
|
{
|
_pushTimer.Start();
|
// 客户端连接建立时
|
connection.OnOpen = () =>
|
{
|
string userId = "";
|
string userName = "";
|
// 获取参数
|
string path = connection.ConnectionInfo.Path;
|
Uri dummyUri = new Uri($"http://localhost{path}");
|
var queryParams = HttpUtility.ParseQueryString(dummyUri.Query);
|
|
userId = queryParams["userId"];
|
userName = queryParams["userName"];
|
|
LogService.Write($"WebSocket 客户端连接:{userId}_{userName}-{connection.ConnectionInfo.ClientIpAddress}");
|
_connections.Add(connection); // 保存连接
|
_NameConnectionDict[connection] = $@"{userId}_{userName}";
|
};
|
|
// 收到客户端消息时
|
connection.OnMessage = message =>
|
{
|
Console.WriteLine($"收到消息:{message}");
|
|
// TODO 心跳信号
|
};
|
|
// 客户端断开连接时
|
connection.OnClose = () =>
|
{
|
LogService.Write($"WebSocket 客户端断开:{connection.ConnectionInfo.ClientIpAddress}");
|
|
string success = "";
|
// 移除连接
|
_NameConnectionDict.TryRemove(connection,out success);
|
// _connections.Remove(connection);
|
};
|
|
// 连接出错时
|
connection.OnError = ex =>
|
{
|
string success = "";
|
LogService.Write($"WebSocket 错误:{ex.Message}");
|
_NameConnectionDict.TryRemove(connection, out success);
|
};
|
});
|
|
LogService.Write($"Fleck WebSocket 服务已启动,监听:{wsUrl}");
|
}
|
|
/// <summary>
|
/// 停止 WebSocket 服务
|
/// </summary>
|
public static void Stop()
|
{
|
_server?.Dispose();
|
_pushTimer?.Stop();
|
|
Console.WriteLine("Fleck WebSocket 服务已停止");
|
}
|
|
|
/// <summary>
|
/// 静态定时器触发事件
|
/// </summary>
|
private static void OnTimerElapsed(object sender, System.Timers.ElapsedEventArgs e)
|
{
|
SQLHelper.ClsCN oCN = new SQLHelper.ClsCN();
|
DataSet ds = new DataSet();
|
try
|
{
|
JObject NameMsgKVP = new JObject();
|
// 获取数据库中未被读取的数据
|
ds = oCN.RunProcReturn("select * from h_v_OA_WorkLinkBillAllList where 单据状态 = '已审核' and 阅读状态 = '未阅'", "h_v_OA_WorkLinkBillAllList");
|
// 判断广播还是私发 ,统计数据
|
JArray dataTableJArray = JArray.Parse(JsonConvert.SerializeObject(ds.Tables[0]));
|
foreach (JToken row in dataTableJArray)
|
{
|
// 提前获取并处理行数据(避免重复访问 row["列名"],且处理 null)
|
string receiver = row["接收人"]?.ToString() ?? string.Empty; // 为 null 时返回空字符串
|
string ccUsers = row["所有抄送人"]?.ToString() ?? string.Empty;
|
string sendType = row["发送类型"]?.ToString() ?? string.Empty;
|
|
// 跳过无效数据行(三列均为空时,无需处理)
|
if (string.IsNullOrEmpty(receiver) && string.IsNullOrEmpty(ccUsers) && string.IsNullOrEmpty(sendType))
|
continue;
|
|
foreach (var dictOne in _NameConnectionDict)
|
{
|
string UName = dictOne.Value.Split('_')[1];
|
// 将接收人 抄送人 获取 广播的消息添加到消息缓存
|
if (string.Equals(row["接收人"].ToString(), UName)
|
|| row["所有抄送人"].ToString().Contains(UName)
|
|| string.Equals(row["发送类型"].ToString(), "公共"))
|
{
|
string dictOneVal = dictOne.Value.ToString();
|
if (NameMsgKVP[dictOneVal] == null)
|
{
|
NameMsgKVP[dictOneVal] = new JArray();
|
}
|
|
var jArray = NameMsgKVP[dictOneVal] as JArray;
|
|
jArray.Add(row);
|
}
|
}
|
}
|
// 根据用户ID分发到不同的客户端
|
foreach (var dictOne in _NameConnectionDict)
|
{
|
if(NameMsgKVP[dictOne.Value.ToString()] != null)
|
{
|
// 封装 websocket 信息
|
dictOne.Key.Send(new JObject{
|
["Type"]="Message",
|
["Content"]= NameMsgKVP[dictOne.Value.ToString()].ToString()
|
}.ToString());
|
}
|
}
|
LogService.Write("消息推送完成...");
|
}
|
catch (Exception ex)
|
{
|
LogService.Write(ex.ToString());
|
return;
|
}
|
finally
|
{
|
// 静态定时器手动重启
|
if (_pushTimer != null && !_pushTimer.Enabled)
|
{
|
_pushTimer.Start();
|
}
|
}
|
}
|
}
|
}
|