This commit is contained in:
2023-05-31 10:19:05 +08:00
parent 1b65a7a9e5
commit 9c621c75cd
238 changed files with 9905 additions and 4034 deletions

View File

@@ -1,4 +1,5 @@
using JNPF.Common.Security;
using JNPF.Common.Extension;
using JNPF.Common.Security;
using JNPF.EventBus;
using JNPF.Logging;
using RabbitMQ.Client;
@@ -59,8 +60,6 @@ public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable
// 创建通道
_model = _connection.CreateModel();
string ExchangeName = "MXK_IDENTITY_MAIN_TOPIC";
/*
* 声明路由队列
* 队列可以重复声明,但是声明所使用的参数必须一致,否则会抛出异常
@@ -77,7 +76,7 @@ public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable
_model.QueueDeclare(queue: routeKey, durable: false, exclusive: false, autoDelete: false, arguments: null);
// 将MaxKey 交换机绑定到 路由中
_model.QueueBind(queue: routeKey, exchange: ExchangeName, routingKey: "#");
// _model.QueueBind(queue: routeKey, exchange: "MXK_IDENTITY_MAIN_TOPIC", routingKey: "#");
// 字节限制,一次接收的消息数,全局/
// 据说prefetchSize 和global这两项rabbitmq没有实现暂且不研究
@@ -95,6 +94,9 @@ public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable
// 转换为 IEventSource这里可以选择自己喜欢的序列化工具如果自定义了 EventSource注意属性是可读可写
var eventSource = JsonSerializer.Deserialize<ChannelEventSource>(stringEventSource);
// 判断到是单点登录服务端信息
if (eventSource.EventId.IsNullOrEmpty()) eventSource = new ChannelEventSource("User:Maxkey_Identity", stringEventSource);
Log.Information($"- 接收到消息:{eventSource.ToJsonString()}");
// 写入内存管道存储器
@@ -105,7 +107,7 @@ public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable
};
// 启动消费者 设置为手动应答消息
_model.BasicConsume(queue: routeKey, autoAck: true, consumer: consumer);
_model.BasicConsume(queue: routeKey, autoAck: false, consumer: consumer);
}
/// <summary>
@@ -122,7 +124,7 @@ public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable
throw new ArgumentNullException(nameof(eventSource));
}
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSouce
// 这里判断是否是 ChannelEventSource 或者 自定义的 EventSource
if (eventSource is ChannelEventSource source)
{
// 序列化,这里可以选择自己喜欢的序列化工具
@@ -146,7 +148,8 @@ public sealed class RabbitMQEventSourceStorer : IEventSourceStorer, IDisposable
public async ValueTask<IEventSource> ReadAsync(CancellationToken cancellationToken)
{
// 读取一条事件源
return await _channel.Reader.ReadAsync(cancellationToken);
var eventSource = await _channel.Reader.ReadAsync(cancellationToken);
return eventSource;
}
/// <summary>