将定时任务改为,发布、订阅模式的消息队列执行任务

This commit is contained in:
alex
2023-08-15 13:53:04 +08:00
parent 4c1e3c8c40
commit 28b7800baf
13 changed files with 238 additions and 45 deletions

View File

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tnb.WarehouseMgr.Entities.Attributes
{
/// <summary>
///定时任务特性
/// </summary>
[AttributeUsage(AttributeTargets.Method, AllowMultiple = false)]
public class TimedAttribute : Attribute
{
public string Name { get; set; }
}
}

View File

@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Tnb.WarehouseMgr.Entities.Dto.Inputs
{
/// <summary>
/// 通知消息类
/// </summary>
public class NotifyMessage
{
/// <summary>
/// 任务名称
/// </summary>
public string TaskName { get; set; } = string.Empty;
}
}

View File

@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
using Tnb.WarehouseMgr.Entities.Dto.Inputs;
namespace Tnb.WarehouseMgr.Interfaces
{
/// <summary>
/// 任务消息通知接口
/// </summary>
public interface ITaskMessageNotify
{
ChannelReader<NotifyMessage> Reader { get; }
ChannelWriter<NotifyMessage> Writer { get; }
}
}

View File

@@ -0,0 +1,27 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc.Filters;
using Microsoft.Extensions.DependencyInjection;
using Tnb.WarehouseMgr.Entities.Dto.Inputs;
using Tnb.WarehouseMgr.Interfaces;
namespace Tnb.WarehouseMgr.Filters
{
public class NotifyFilterAttribute : ActionFilterAttribute
{
public override async void OnActionExecuted(ActionExecutedContext context)
{
var actionName = context.ActionDescriptor.RouteValues["action"]!;
var taskMessageNotify = context.HttpContext.RequestServices.GetRequiredService<ITaskMessageNotify>();
if (taskMessageNotify != null)
{
NotifyMessage message = new() { TaskName = actionName };
await taskMessageNotify.Writer.WriteAsync(message);
}
}
}
}

View File

@@ -0,0 +1,45 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Tnb.WarehouseMgr.Entities.Dto.Inputs;
using Tnb.WarehouseMgr.Interfaces;
namespace Tnb.WarehouseMgr
{
/// <summary>
/// 任务消息通知
/// </summary>
public class TaskMesageNotify : ITaskMessageNotify
{
private readonly Channel<NotifyMessage> _channel = Channel.CreateUnbounded<NotifyMessage>();
public ChannelReader<NotifyMessage> Reader => _channel.Reader;
public ChannelWriter<NotifyMessage> Writer => _channel.Writer;
}
//public static class TaskMesageNotify
//{
// private static readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
// public static ChannelReader<string> Reader => _channel.Reader;
// public static ChannelWriter<string> Writer => _channel.Writer;
//}
public static class TaskMessageNotifyExtensions
{
public static IServiceCollection AddTaskMessageNotify(this IServiceCollection services)
{
services.AddSingleton<ITaskMessageNotify, TaskMesageNotify>();
return services;
}
}
}

View File

@@ -1,6 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
using Aop.Api.Domain;
@@ -17,9 +20,12 @@ using JNPF.Message.Interfaces.Message;
using JNPF.Systems.Entitys.System;
using Mapster;
using Microsoft.CodeAnalysis.CSharp.Syntax;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Natasha.CSharp;
using Tnb.Common.Extension;
using Tnb.WarehouseMgr.Entities.Attributes;
using Tnb.WarehouseMgr.Entities.Dto.Inputs;
using Tnb.WarehouseMgr.Entities.Exceptions;
using Tnb.WarehouseMgr.Interfaces;
@@ -33,27 +39,58 @@ namespace Tnb.WarehouseMgr
public class TimedTaskBackgroundService : BackgroundService
{
private IEventPublisher _eventPublisher = default!;
private readonly IServiceProvider _serviceProvider;
private static Dictionary<string, Func<CancellationTokenSource?, Task>> _timedFuncMap = new(StringComparer.OrdinalIgnoreCase);
static TimedTaskBackgroundService()
{
_timedFuncMap = App.EffectiveTypes.AsParallel().Where(t => !t.Namespace.IsNullOrWhiteSpace() && t.Namespace.Contains("Tnb.WarehouseMgr")).SelectMany(t => t.GetMethods())
.Where(m => m.GetCustomAttribute<TimedAttribute>() != null)
.ToDictionary(x => x.Name, x =>
(Func<CancellationTokenSource?, Task>)Delegate.CreateDelegate(typeof(Func<CancellationTokenSource?, Task>), App.GetService(x.DeclaringType), x));
}
public TimedTaskBackgroundService(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.Run(() =>
protected override Task ExecuteAsync(CancellationToken stoppingToken) => Task.Run(async () =>
{
_eventPublisher = App.GetRequiredService<IEventPublisher>();
List<string> toUserIds = new List<string>() { "25398501929509" };
//生成任务执行
CancellationTokenSource genTaskCTS = new();
CancellationTokenSource kittingOutAddCts = new();
CancellationTokenSource kittingOutShippedCts = new();
CancellationTokenSource setSortingCts = new();
var wareHouseService = App.GetRequiredService<IWareHouseService>();
TimedTask(cts => wareHouseService.GenTaskExecute(cts), genTaskCTS, toUserIds);
//齐套出库
var channelReader = _serviceProvider.GetRequiredService<ITaskMessageNotify>().Reader;
var kittingOutService = App.GetRequiredService<IWmskittingOutService>();
TimedTask(cts => kittingOutService.KittingOutByAdd(cts), kittingOutAddCts, toUserIds);
TimedTask(cts => kittingOutService.KittingOutByIsToBeShipped(cts), kittingOutShippedCts, toUserIds);
//齐套分拣
var setSortingService = App.GetRequiredService<IWmsSetSortingService>();
TimedTask(cts => setSortingService.PackSortingByAdd(cts), setSortingCts, toUserIds);
CancellationTokenSource? cts = new();
while (channelReader != null && await channelReader.WaitToReadAsync())
{
while (channelReader.TryRead(out var message))
{
if (_timedFuncMap.ContainsKey(message.TaskName))
{
await _timedFuncMap[message.TaskName].Invoke(cts);
}
}
}
#region
//_eventPublisher = App.GetRequiredService<IEventPublisher>();
//List<string> toUserIds = new List<string>() { "25398501929509" };
////生成任务执行
//CancellationTokenSource genTaskCTS = new();
//CancellationTokenSource kittingOutAddCts = new();
//CancellationTokenSource kittingOutShippedCts = new();
//CancellationTokenSource setSortingCts = new();
//var wareHouseService = App.GetRequiredService<IWareHouseService>();
//TimedTask(cts => wareHouseService.GenTaskExecute(cts), genTaskCTS, toUserIds);
////齐套出库
//var kittingOutService = App.GetRequiredService<IWmskittingOutService>();
//TimedTask(cts => kittingOutService.KittingOutByAdd(cts), kittingOutAddCts, toUserIds);
//TimedTask(cts => kittingOutService.KittingOutByIsToBeShipped(cts), kittingOutShippedCts, toUserIds);
////齐套分拣
//var setSortingService = App.GetRequiredService<IWmsSetSortingService>();
//TimedTask(cts => setSortingService.PackSortingByAdd(cts), setSortingCts, toUserIds);
#endregion
}, stoppingToken);
private Task TimedTask(Func<CancellationTokenSource, Task> action, CancellationTokenSource cts, List<string>? toUserIds = default)

View File

@@ -42,6 +42,7 @@ using Tnb.BasicData.Entities.Enums;
using Tnb.Common.Extension;
using Tnb.Common.Utils;
using Tnb.WarehouseMgr.Entities;
using Tnb.WarehouseMgr.Entities.Attributes;
using Tnb.WarehouseMgr.Entities.Configs;
using Tnb.WarehouseMgr.Entities.Consts;
using Tnb.WarehouseMgr.Entities.Dto;
@@ -222,16 +223,16 @@ namespace Tnb.WarehouseMgr
/// 生成任务执行
/// </summary>
/// <returns></returns>
[HttpPost]
[HttpPost, Timed(Name = nameof(IWareHouseService.GenTaskExecute))]
public async Task GenTaskExecute(CancellationTokenSource? cts = default)
{
Stopwatch sw = Stopwatch.StartNew();
CancellationTokenSource agvCts = new();
//获取用户登录令牌
var aToken = await _cacheManager.GetAsync("AsscessToken");
if (aToken.IsNullOrWhiteSpace()) return;
var curUser = await GetUserIdentity(aToken);
//var aToken = await _cacheManager.GetAsync("AsscessToken");
//if (aToken.IsNullOrWhiteSpace()) return;
//var curUser = await GetUserIdentity(aToken);
var db = _db.CopyNew();
try
@@ -239,7 +240,7 @@ namespace Tnb.WarehouseMgr
//获取所有未下发的预任务申请
var preTasks = await db.Queryable<WmsPretaskH>().InnerJoin<WmsCarryH>((a, b) => a.startlocation_id == b.location_id && a.carry_id == b.id)
.InnerJoin<WmsAreaH>((a, b, c) => a.area_id == c.id)
.Where(a => a.status == WmsWareHouseConst.PRETASK_BILL_STATUS_DXF_ID)
.Where(a => a.status == WmsWareHouseConst.PRETASK_BILL_STATUS_DXF_ID && !string.IsNullOrWhiteSpace(a.startlocation_id))
.OrderBy(a => new { priority = SqlFunc.Desc(a.priority), a.bill_code })
.Select((a, b, c) => new WmsPretaskH
{
@@ -379,18 +380,9 @@ namespace Tnb.WarehouseMgr
catch (Exception ex)
{
Log.Error("生成预任务执行时出现错误", ex);
var opts = curUser.FindFirst(ClaimConst.CONNECTIONCONFIG)?.Value;
TimedTaskErrorInfo ei = new()
{
RequestURL = App.HttpContext?.Request?.Path,
RequestMethod = App.HttpContext?.Request?.Method,
userIdentity = curUser,
};
var timedTaskEx = ex.ToTimedTaskException(ei);
await db.Ado.RollbackTranAsync();
cts?.Cancel();
throw timedTaskEx;
throw;
}
finally
{
@@ -531,11 +523,15 @@ namespace Tnb.WarehouseMgr
locIts.Add(loc);
}
await _db.Updateable(carryIts).UpdateColumns(it => new { it.is_lock, it.location_id, it.location_code }).ExecuteCommandAsync();
//更新条码的库位和仓库信息
await _db.Updateable(carryCodeIts).UpdateColumns(it => new { it.warehouse_id, it.location_id, it.location_code }).Where(it => multiList.Select(x => x.carry_id).Contains(it.carry_id)).ExecuteCommandAsync();
var carryIdItor = multiList.Select(x => x.carry_id);
await _db.Updateable(carryCodeIts).UpdateColumns(it => new { it.warehouse_id, it.location_id, it.location_code }).Where(it => it.carry_id.In(carryIdItor)).ExecuteCommandAsync();
//更新库位信息,使用状态为 使用,锁定状态为未锁定
await _db.Updateable(locIts).UpdateColumns(it => new { it.is_use, it.is_lock }).Where(it => multiList.Select(x => x.endlocation_id).Contains(it.id)).ExecuteCommandAsync();
var locIdItor = multiList.Select(x => x.endlocation_id);
await _db.Updateable(locIts).UpdateColumns(it => new { it.is_use, it.is_lock }).Where(it => it.id.In(locIdItor)).ExecuteCommandAsync();
//更新业务主表的单据状态
foreach (var dt in disTasks)
{

View File

@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;
using JNPF.Common.Contracts;
using JNPF.Common.Core.Manager;
@@ -24,6 +25,7 @@ using Tnb.WarehouseMgr.Entities.Attributes;
using Tnb.WarehouseMgr.Entities.Consts;
using Tnb.WarehouseMgr.Entities.Dto;
using Tnb.WarehouseMgr.Entities.Dto.Inputs;
using Tnb.WarehouseMgr.Filters;
using Tnb.WarehouseMgr.Interfaces;
namespace Tnb.WarehouseMgr
@@ -44,13 +46,16 @@ namespace Tnb.WarehouseMgr
private readonly IWareHouseService _wareHouseService;
private readonly IBillRullService _billRullService;
private readonly IUserManager _userManager;
private readonly ChannelWriter<NotifyMessage> _channelWriter;
public WmsEmptyInstockService(
ISqlSugarRepository<WmsCarryH> repository,
IRunService runService,
IVisualDevService visualDevService,
IWareHouseService wareHouseService,
IUserManager userManager,
IBillRullService billRullService)
IBillRullService billRullService,
ITaskMessageNotify taskMesageNotify
)
{
_db = repository.AsSugarClient();
_runService = runService;
@@ -58,9 +63,9 @@ namespace Tnb.WarehouseMgr
_wareHouseService = wareHouseService;
_userManager = userManager;
_billRullService = billRullService;
_channelWriter = taskMesageNotify.Writer;
OverideFuncs.CreateAsync = WmsEmptyIn;
}
private async Task<dynamic> WmsEmptyIn(VisualDevModelDataCrInput input)
{
@@ -159,6 +164,7 @@ namespace Tnb.WarehouseMgr
await _wareHouseService.GenInStockTaskHandleAfter(preTaskUpInput,
it => new WmsCarryH { is_lock = 1, location_id = preTaskUpInput.CarryStartLocationId, location_code = preTaskUpInput.CarryStartLocationCode },
it => new BasLocation { is_lock = 1 });
}
else
{
@@ -179,6 +185,12 @@ namespace Tnb.WarehouseMgr
await _db.Ado.RollbackTranAsync();
throw;
}
finally
{
//向队列写入消息
NotifyMessage message = new() { TaskName = nameof(IWareHouseService.GenTaskExecute) };
await _channelWriter.WriteAsync(message);
}
return Task.FromResult(true);
}
/// <summary>

View File

@@ -58,7 +58,7 @@ namespace Tnb.WarehouseMgr
/// 齐套分拣(新增状态)
/// </summary>
/// <returns></returns>
[HttpPost]
[HttpPost, Timed(Name = nameof(PackSortingByAdd))]
public async Task PackSortingByAdd(CancellationTokenSource? cts = default)
{
var aToken = await _cacheManager.GetAsync("AccessToken");

View File

@@ -75,7 +75,7 @@ namespace Tnb.WarehouseMgr
/// 齐套出库(新增状态)
/// </summary>
/// <returns></returns>
[HttpPost]
[HttpPost, Timed(Name = nameof(KittingOutByAdd))]
public async Task KittingOutByAdd(CancellationTokenSource? cts = default)
{
var aToken = await _cacheManager.GetAsync("AsscessToken");
@@ -163,7 +163,7 @@ namespace Tnb.WarehouseMgr
}
catch (Exception ex)
{
JNPF.Logging.Log.Error("齐套出库,新增时出现错误", ex);
Log.Error("齐套出库,新增时出现错误", ex);
await curDb.Ado.RollbackTranAsync();
TimedTaskErrorInfo ei = new()
{
@@ -175,12 +175,16 @@ namespace Tnb.WarehouseMgr
cts?.Cancel();
throw timedTaskEx;
}
finally
{
}
}
/// <summary>
/// 齐套出库,(待配送状态)
/// </summary>
/// <returns></returns>
[HttpPost]
[HttpPost, Timed(Name = nameof(KittingOutByIsToBeShipped))]
public async Task KittingOutByIsToBeShipped(CancellationTokenSource? cts = default)
{
var aToken = await _cacheManager.GetAsync("AsscessToken");
@@ -272,7 +276,7 @@ namespace Tnb.WarehouseMgr
}
catch (Exception ex)
{
JNPF.Logging.Log.Error("齐套出库,待配送时出现错误", ex);
Log.Error("齐套出库,待配送时出现错误", ex);
TimedTaskErrorInfo ei = new()
{
RequestURL = App.HttpContext?.Request?.Path,

View File

@@ -62,7 +62,8 @@ public class Startup : AppStartup
.AddSenparcWeixinServices(App.Configuration); // Senparc.Weixin 注册如果使用Senparc.Weixin SDK则添加
services.AddOverideVisualDev();
//注册任务消息通知 added by ly on 20230814
services.AddTaskMessageNotify();
//定时任务
services.AddHostedService<TimedTaskBackgroundService>();

View File

@@ -70,4 +70,21 @@ public static class EnumerableExtensions
collection = collection as IList<T> ?? collection.ToList();
return !collection.Any();
}
/// <summary>
/// Contains扩展
/// added by ly on 20230814
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="obj"></param>
/// <param name="collection"></param>
/// <returns></returns>
public static bool In<T>(this T obj, IEnumerable<T> collection)
{
var result = false;
foreach (var item in collection)
{
result |= item.Equals(obj);
}
return result;
}
}

View File

@@ -607,8 +607,7 @@ public class OAuthService : IDynamicApiController, ITransient
}, tokenTimeout);
//modify by ly on 20230804
UserManager.AsscessToken = accessToken;
await _cacheManager.SetAsync("AsscessToken", accessToken,TimeSpan.FromMinutes(30));
await _cacheManager.SetAsync("AsscessToken", accessToken, TimeSpan.FromMinutes(-1));
// 单点登录标识缓存
if (_oauthOptions.Enabled) _cacheManager.Set("OnlineTicket_" + input.online_ticket, options.ConfigId);