using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; using Aop.Api.Domain; using JNPF; using JNPF.Common.Core.Manager; using JNPF.Common.Dtos.Message; using JNPF.Common.Extension; using JNPF.Common.Security; using JNPF.EventBus; using JNPF.EventHandler; using JNPF.FriendlyException; using JNPF.Logging; using JNPF.Message.Interfaces.Message; using JNPF.Systems.Entitys.System; using Mapster; using Microsoft.CodeAnalysis.CSharp.Syntax; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Options; using Natasha.CSharp; using NetTaste; using Tnb.Common.Extension; using Tnb.Common.Utils; using Tnb.WarehouseMgr.Entities.Attributes; using Tnb.WarehouseMgr.Entities.Configs; using Tnb.WarehouseMgr.Entities.Dto.Inputs; using Tnb.WarehouseMgr.Entities.Dto.Outputs; using Tnb.WarehouseMgr.Entities.Enums; using Tnb.WarehouseMgr.Entities.Exceptions; using Tnb.WarehouseMgr.Interfaces; namespace Tnb.WarehouseMgr { /// /// 定时任务 /// added by ly on 20230802 /// public class TimedTaskBackgroundService : BackgroundService { public bool IsStarted { get; set; } private IEventPublisher _eventPublisher = default!; private readonly IServiceProvider _serviceProvider; private static Dictionary> _timedFuncMap = new(StringComparer.OrdinalIgnoreCase); static TimedTaskBackgroundService() { //Task.Run(() => //{ // _timedFuncMap = App.EffectiveTypes.AsParallel().Where(t => !t.Namespace.IsNullOrWhiteSpace() && t.Namespace.Equals("Tnb.WarehouseMgr", StringComparison.OrdinalIgnoreCase)).SelectMany(t => t.GetMethods()) // .Where(m => m.GetCustomAttribute() != null) // .ToDictionary(x => x.Name, x => // (Func)Delegate.CreateDelegate(typeof(Func), App.GetService(x.DeclaringType), x)); //}); } public TimedTaskBackgroundService(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { IsStarted = true; //var queueTask = Task.Run(async () => // { // var channelReader = _serviceProvider.GetRequiredService().Reader; // CancellationTokenSource? cts = new(); // while (channelReader != null && await channelReader.WaitToReadAsync()) // { // while (channelReader.TryRead(out var message)) // { // if (_timedFuncMap.ContainsKey(message.TaskName)) // { // await _timedFuncMap[message.TaskName].Invoke(stoppingToken); // } // } // } // }, stoppingToken); var timedTask = Task.Run(() => { _eventPublisher = App.GetRequiredService(); var whSvc = App.GetRequiredService(); TimedTask(token => whSvc.GenTaskExecute(token), stoppingToken, 1); //齐套出库 //var kittingOutService = App.GetRequiredService(); //TimedTask(token => kittingOutService.KittingOutByAdd(token), stoppingToken, 1); //TimedTask(token => kittingOutService.KittingOutByIsToBeShipped(token), stoppingToken, 1); //齐套分拣 //var setSortingService = App.GetRequiredService(); //TimedTask(token => setSortingService.PackSortingByAdd(token), stoppingToken, 1); var _elevatorControlConfiguration = App.Configuration.Build(); TimedTask(async token => { var parameter = new Dictionary(); parameter["DevName"] = _elevatorControlConfiguration.DevName; parameter["TagName"] = "AGVKeepalive"; parameter["Value"] = "123"; parameter["token"] = _elevatorControlConfiguration.token; var result = await HttpClientHelper.GetAsync(_elevatorControlConfiguration.WriteTagUrl, pars: parameter); await Console.Out.WriteLineAsync($"心跳检测结果:{result}"); },stoppingToken,30,TimeSpanUnit.Seconds); //最低库存检查 var transferSignService = App.GetRequiredService(); TimedTask(token => transferSignService.IsMinStorage(token), stoppingToken, 30, TimeSpanUnit.Minutes); }, stoppingToken); return timedTask; } private Task TimedTask(Func action, CancellationToken ct, int interval, TimeSpanUnit timeType = TimeSpanUnit.Seconds) { var token = ct; return Task.Factory.StartNew(async () => { while (!token.IsCancellationRequested) { await action(ct).Catch(async ex => { if (ex is TimedTaskException timedTaskEx and not null) { //await _eventPublisher.PublishAsync(new LogEventSource("Log:CreateExLog", timedTaskEx.options!, new SysLogEntity //{ // Id = SnowflakeIdHelper.NextId(), // Category = 4, // UserId = timedTaskEx.UserId, // UserName = timedTaskEx.UserName, // IPAddress = NetHelper.Ip, // RequestURL = timedTaskEx.RequestURL, // RequestMethod = timedTaskEx.RequestMethod, // Json = timedTaskEx + "\n" + timedTaskEx.InnerException?.StackTrace + "\n" + timedTaskEx?.TargetSite?.GetParameters().ToString(), // //PlatForm = string.Format("{0}-{1}", userAgent.OS.ToString(), userAgent.RawValue), // CreatorTime = DateTime.Now //})); } }); await TaskDelay(timeType, interval); } }, ct, TaskCreationOptions.None, new CustomerTaskScheduler()); #region ThreadPool 线程运行会导致线程饥饿 //return Task.Run(async () => //{ // while (!token.IsCancellationRequested) // { // await action(cts).Catch(async ex => // { // if (ex is TimedTaskException timedTaskEx and not null) // { // await _eventPublisher.PublishAsync(new LogEventSource("Log:CreateExLog", timedTaskEx.options!, new SysLogEntity // { // Id = SnowflakeIdHelper.NextId(), // Category = 4, // UserId = timedTaskEx.UserId, // UserName = timedTaskEx.UserName, // IPAddress = NetHelper.Ip, // RequestURL = timedTaskEx.RequestURL, // RequestMethod = timedTaskEx.RequestMethod, // Json = timedTaskEx + "\n" + timedTaskEx.InnerException?.StackTrace + "\n" + timedTaskEx?.TargetSite?.GetParameters().ToString(), // //PlatForm = string.Format("{0}-{1}", userAgent.OS.ToString(), userAgent.RawValue), // CreatorTime = DateTime.Now // })); // } // }); // await TaskDelay(timeType, interval); // } //}, token); #endregion } public override Task StopAsync(CancellationToken cancellationToken) { IsStarted = false; return Task.CompletedTask; } private Task TaskDelay(TimeSpanUnit timeType, int interval) { Task delayTask = timeType switch { TimeSpanUnit.Milliseconds => Task.Delay(TimeSpan.FromMilliseconds(interval)), TimeSpanUnit.Seconds => Task.Delay(TimeSpan.FromSeconds(interval)), TimeSpanUnit.Minutes => Task.Delay(TimeSpan.FromMinutes(interval)), TimeSpanUnit.Hours => Task.Delay(TimeSpan.FromHours(interval)), TimeSpanUnit.Days => Task.Delay(TimeSpan.FromDays(interval)), _ => throw new NotImplementedException() }; return delayTask; } } /// /// 自定义任务调度器,保证长任务在单独的线程中运行 /// internal class CustomerTaskScheduler : TaskScheduler { // 这边的 BlockingCollection 只是举个例子,如果是普通的队列,配合锁也是可以的。 private readonly BlockingCollection _tasks = new BlockingCollection(); public CustomerTaskScheduler() { var thread = new Thread(() => { foreach (var task in _tasks.GetConsumingEnumerable()) { TryExecuteTask(task); } }) { IsBackground = true }; thread.Start(); } protected override IEnumerable GetScheduledTasks() { return _tasks; } protected override void QueueTask(Task task) { _tasks.Add(task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } } }