继续调整,手动开启定时任务相关逻辑代码,并处理相关bug

This commit is contained in:
alex
2023-09-20 18:20:15 +08:00
parent 4220ca4b8b
commit 8ce7bb540e
12 changed files with 53 additions and 43 deletions

View File

@@ -52,7 +52,7 @@ namespace Tnb.WarehouseMgr.Interfaces
/// 生成任务执行
/// </summary>
/// <returns></returns>
Task GenTaskExecute(CancellationTokenSource? cts = default);
Task GenTaskExecute(CancellationToken? ct = default);
/// <summary>
/// 任务完成
/// </summary>

View File

@@ -11,6 +11,6 @@ namespace Tnb.WarehouseMgr.Interfaces
/// </summary>
public interface IWmsPDATransferSignService
{
Task IsMinStorage(CancellationTokenSource? cts = default);
Task IsMinStorage(CancellationToken? cts = default);
}
}

View File

@@ -11,6 +11,6 @@ namespace Tnb.WarehouseMgr.Interfaces
/// </summary>
public interface IWmsSetSortingService
{
Task PackSortingByAdd(CancellationTokenSource? cts = default);
Task PackSortingByAdd(CancellationToken? cts = default);
}
}

View File

@@ -22,12 +22,12 @@ namespace Tnb.WarehouseMgr.Interfaces
/// 齐套出库(新增状态)
/// </summary>
/// <returns></returns>
Task KittingOutByAdd(CancellationTokenSource? cts = default);
Task KittingOutByAdd(CancellationToken? ct = default);
/// <summary>
/// 齐套出库,(待配送状态)
/// </summary>
/// <param name="cts"></param>
/// <returns></returns>
Task KittingOutByIsToBeShipped(CancellationTokenSource? cts = default);
Task KittingOutByIsToBeShipped(CancellationToken? ct = default);
}
}

View File

@@ -149,7 +149,7 @@ namespace Tnb.WarehouseMgr
/// </summary>
/// <returns></returns>
[HttpPost]
public async Task IsMinStorage(CancellationTokenSource? cts = default)
public async Task IsMinStorage(CancellationToken? cts = default)
{
//if (UserManager.AsscessToken.IsNullOrWhiteSpace()) return;
//var curUser = await GetUserIdentity();
@@ -224,7 +224,7 @@ namespace Tnb.WarehouseMgr
userIdentity = await GetUserIdentity(_userManager.ToKen),
};
var timedTaskEx = ex.ToTimedTaskException(ei);
cts?.Cancel();
//cts?.Cancel();
throw timedTaskEx;
}
}

View File

@@ -42,7 +42,7 @@ namespace Tnb.WarehouseMgr
public bool IsStarted { get; set; }
private IEventPublisher _eventPublisher = default!;
private readonly IServiceProvider _serviceProvider;
private static Dictionary<string, Func<CancellationTokenSource?, Task>> _timedFuncMap = new(StringComparer.OrdinalIgnoreCase);
private static Dictionary<string, Func<CancellationToken?, Task>> _timedFuncMap = new(StringComparer.OrdinalIgnoreCase);
static TimedTaskBackgroundService()
{
Task.Run(() =>
@@ -50,7 +50,7 @@ namespace Tnb.WarehouseMgr
_timedFuncMap = App.EffectiveTypes.AsParallel().Where(t => !t.Namespace.IsNullOrWhiteSpace() && t.Namespace.Equals("Tnb.WarehouseMgr", StringComparison.OrdinalIgnoreCase)).SelectMany(t => t.GetMethods())
.Where(m => m.GetCustomAttribute<TimedAttribute>() != null)
.ToDictionary(x => x.Name, x =>
(Func<CancellationTokenSource?, Task>)Delegate.CreateDelegate(typeof(Func<CancellationTokenSource?, Task>), App.GetService(x.DeclaringType), x));
(Func<CancellationToken?, Task>)Delegate.CreateDelegate(typeof(Func<CancellationToken?, Task>), App.GetService(x.DeclaringType), x));
});
}
public TimedTaskBackgroundService(IServiceProvider serviceProvider)
@@ -74,7 +74,7 @@ namespace Tnb.WarehouseMgr
{
if (_timedFuncMap.ContainsKey(message.TaskName))
{
await _timedFuncMap[message.TaskName].Invoke(cts);
await _timedFuncMap[message.TaskName].Invoke(stoppingToken);
}
}
}
@@ -83,24 +83,16 @@ namespace Tnb.WarehouseMgr
var timedTask = Task.Run(() =>
{
_eventPublisher = App.GetRequiredService<IEventPublisher>();
////生成任务执行
//CancellationTokenSource genTaskCTS = new();
CancellationTokenSource kittingOutAddCts = new();
CancellationTokenSource kittingOutShippedCts = new();
CancellationTokenSource setSortingCts = new();
CancellationTokenSource isMinStorageCts = new();
//var wareHouseService = App.GetRequiredService<IWareHouseService>();
//TimedTask(cts => wareHouseService.GenTaskExecute(cts), genTaskCTS);
//齐套出库
var kittingOutService = App.GetRequiredService<IWmskittingOutService>();
TimedTask(cts => kittingOutService.KittingOutByAdd(cts), kittingOutAddCts, 1);
TimedTask(cts => kittingOutService.KittingOutByIsToBeShipped(cts), kittingOutShippedCts, 1);
TimedTask(cts => kittingOutService.KittingOutByAdd(stoppingToken), stoppingToken, 1);
TimedTask(cts => kittingOutService.KittingOutByIsToBeShipped(stoppingToken), stoppingToken, 1);
//齐套分拣
var setSortingService = App.GetRequiredService<IWmsSetSortingService>();
TimedTask(cts => setSortingService.PackSortingByAdd(cts), setSortingCts, 1);
TimedTask(cts => setSortingService.PackSortingByAdd(cts), stoppingToken, 1);
//最低库存检查
var transferSignService = App.GetRequiredService<IWmsPDATransferSignService>();
TimedTask(cts => transferSignService.IsMinStorage(cts), isMinStorageCts, 30, TimeSpanUnit.Minutes);
TimedTask(cts => transferSignService.IsMinStorage(cts), stoppingToken, 30, TimeSpanUnit.Minutes);
}, stoppingToken);
@@ -109,14 +101,14 @@ namespace Tnb.WarehouseMgr
private Task TimedTask(Func<CancellationTokenSource, Task> action, CancellationTokenSource cts, int interval, TimeSpanUnit timeType = TimeSpanUnit.Seconds)
private Task TimedTask(Func<CancellationToken, Task> action, CancellationToken ct, int interval, TimeSpanUnit timeType = TimeSpanUnit.Seconds)
{
var token = cts.Token;
var token = ct;
return Task.Factory.StartNew(async () =>
{
while (!token.IsCancellationRequested)
{
await action(cts).Catch(async ex =>
await action(ct).Catch(async ex =>
{
if (ex is TimedTaskException timedTaskEx and not null)
{
@@ -137,7 +129,7 @@ namespace Tnb.WarehouseMgr
});
await TaskDelay(timeType, interval);
}
}, cts.Token, TaskCreationOptions.None, new CustomerTaskScheduler());
}, ct, TaskCreationOptions.None, new CustomerTaskScheduler());
#region ThreadPool 线线饿
//return Task.Run(async () =>
@@ -172,6 +164,7 @@ namespace Tnb.WarehouseMgr
public override Task StopAsync(CancellationToken cancellationToken)
{
IsStarted = false;
return Task.CompletedTask;
//return base.StopAsync(cancellationToken);

View File

@@ -211,9 +211,10 @@ namespace Tnb.WarehouseMgr
whereExprable.And(carryStatusFilterExp);
var whereExpr = whereExprable.ToExpression();
var policy = await _db.Queryable<WmsInstockPolicies>().Where(it => it.status == 1).FirstAsync();
var cyDb = _db.CopyNew();
var policy = await cyDb.Queryable<WmsInstockPolicies>().Where(it => it.status == 1).FirstAsync();
if (policy == null) throw new AppFriendlyException("没有可用策略", 500);
var items = await _db.Queryable<WmsCarryH>().LeftJoin<WmsCarryCode>((a, b) => a.id == b.carry_id)
var items = await cyDb.Queryable<WmsCarryH>().LeftJoin<WmsCarryCode>((a, b) => a.id == b.carry_id)
.LeftJoin<BasLocation>((a, b, c) => a.location_id == c.id)
.Where(whereExpr)
.OrderBy(policy.policy)
@@ -226,7 +227,7 @@ namespace Tnb.WarehouseMgr
/// </summary>
/// <returns></returns>
[HttpPost, Timed(Name = nameof(IWareHouseService.GenTaskExecute))]
public async Task GenTaskExecute(CancellationTokenSource? cts = default)
public async Task GenTaskExecute(CancellationToken? ct = default)
{
Stopwatch sw = Stopwatch.StartNew();
CancellationTokenSource agvCts = new();
@@ -378,13 +379,11 @@ namespace Tnb.WarehouseMgr
{
Log.Error("生成预任务执行时出现错误", ex);
await db.Ado.RollbackTranAsync();
cts?.Cancel();
throw;
}
finally
{
agvCts.Dispose();
cts?.Dispose();
}
}
/// <summary>

View File

@@ -95,7 +95,7 @@ namespace Tnb.WarehouseMgr
/// </summary>
/// <returns></returns>
[HttpPost, Timed(Name = nameof(PackSortingByAdd))]
public async Task PackSortingByAdd(CancellationTokenSource? cts = default)
public async Task PackSortingByAdd(CancellationToken? ct = default)
{
//if (UserManager.AsscessToken.IsNullOrWhiteSpace()) return;
//var curUser = await GetUserIdentity();
@@ -270,7 +270,7 @@ namespace Tnb.WarehouseMgr
userIdentity = await GetUserIdentity(_userManager.ToKen),
};
var timedTaskEx = ex.ToTimedTaskException(ei);
cts?.Cancel();
//cts?.Cancel();
throw timedTaskEx;
}
finally

View File

@@ -78,10 +78,14 @@ namespace Tnb.WarehouseMgr
/// </summary>
/// <returns></returns>
[HttpPost, Timed(Name = nameof(KittingOutByAdd))]
public async Task KittingOutByAdd(CancellationTokenSource? cts = default)
public async Task KittingOutByAdd(CancellationToken? ct = default)
{
//if (UserManager.AsscessToken.IsNullOrWhiteSpace()) return;
//var curUser = await GetUserIdentity();
if (ct?.IsCancellationRequested ?? false)
{
ct?.ThrowIfCancellationRequested();
}
var curDb = _db.CopyNew();
try
{
@@ -174,7 +178,7 @@ namespace Tnb.WarehouseMgr
userIdentity = await GetUserIdentity(_userManager.ToKen),
};
var timedTaskEx = ex.ToTimedTaskException(ei);
cts?.Cancel();
//cts?.Cancel();
throw timedTaskEx;
}
}
@@ -183,7 +187,7 @@ namespace Tnb.WarehouseMgr
/// </summary>
/// <returns></returns>
[HttpPost, Timed(Name = nameof(KittingOutByIsToBeShipped))]
public async Task KittingOutByIsToBeShipped(CancellationTokenSource? cts = default)
public async Task KittingOutByIsToBeShipped(CancellationToken? ct = default)
{
//if (UserManager.AsscessToken.IsNullOrWhiteSpace()) return;
//var curUser = await GetUserIdentity();
@@ -281,7 +285,7 @@ namespace Tnb.WarehouseMgr
userIdentity = await GetUserIdentity(_userManager.ToKen),
};
var timedTaskEx = ex.ToTimedTaskException(ei);
cts?.Cancel();
//cts?.Cancel();
throw timedTaskEx;
}
finally