using System.Collections.Concurrent; using Aop.Api.Domain; using JNPF; using JNPF.Common.Extension; using JNPF.Common.Security; using JNPF.EventBus; using JNPF.EventHandler; using JNPF.Logging; using JNPF.Systems.Entitys.System; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Newtonsoft.Json.Linq; using NPOI.OpenXmlFormats.Dml.Diagram; using NPOI.SS.Format; using Tnb.Common.Extension; using Tnb.Common.Utils; using Tnb.WarehouseMgr.Entities.Configs; using Tnb.WarehouseMgr.Entities.Consts; using Tnb.WarehouseMgr.Entities.Enums; using Tnb.WarehouseMgr.Entities.Exceptions; namespace Tnb.WarehouseMgr { /// /// 定时任务 /// added by ly on 20230802 /// public class TimedTaskBackgroundService : BackgroundService { public bool IsStarted { get; set; } private IEventPublisher _eventPublisher = default!; private readonly ElevatorControlConfiguration _elevatorControlConfiguration = App.Configuration.Build(); private readonly IServiceProvider _serviceProvider; private readonly IHostApplicationLifetime _lifeTime; private List s_heartbeatDevNames = new(); //private static Dictionary> _timedFuncMap = new(StringComparer.OrdinalIgnoreCase); public static Dictionary elevatorStatus = new Dictionary(); public TimedTaskBackgroundService() { s_heartbeatDevNames = _elevatorControlConfiguration.HeartbeatDevNames; if (elevatorStatus.Count == 0) foreach (var devName in s_heartbeatDevNames) elevatorStatus.Add(devName, false); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { IsStarted = true; Task timedTask = Task.Run(() => { _eventPublisher = App.GetRequiredService(); //电梯Agv心跳检测 _ = TimedTask(async token => { foreach (var devName in s_heartbeatDevNames) { Dictionary parameter = new() { ["DevName"] = devName, ["TagName"] = ElevatorConsts.AGVKeepalive, ["Value"] = "123", ["token"] = _elevatorControlConfiguration.token }; Log.Information($"{devName.Match(@"\d+")}#梯, 心跳检测"); try { string result = await HttpClientHelper.GetAsync(_elevatorControlConfiguration.WriteTagUrl, pars: parameter); Log.Information($"{devName.Match(@"\d+")}#梯, 心跳检测结果:{result}"); JObject resObj = JObject.Parse(result); elevatorStatus[devName] = resObj["Result"].ToString() == "Ok"; //var result = await RedisHelper.HSetAsync(devName, ElevatorConsts.AGVKeepalive, "123"); await Console.Out.WriteLineAsync($"{devName.Match(@"\d+")}#梯, 心跳检测结果:{result}"); } catch (Exception ex) { Log.Error($"{devName.Match(@"\d+")}#梯, 心跳检测结果:{ex.Message}"); elevatorStatus[devName] = false; } } }, stoppingToken, 30); }); return timedTask; } private Task TimedTask(Func action, CancellationToken ct, int interval, TimeSpanUnit timeType = TimeSpanUnit.Seconds) { // CancellationToken token = ct; // return Task.Factory.StartNew(async () => // { // while (!token.IsCancellationRequested) // { // await action(ct).Catch(async ex => // { // if (ex is TimedTaskException timedTaskEx and not null) // { // await _eventPublisher.PublishAsync(new LogEventSource("Log:CreateExLog", timedTaskEx.options!, new SysLogEntity // { // Id = SnowflakeIdHelper.NextId(), // Category = 4, // UserId = timedTaskEx.UserId, // UserName = timedTaskEx.UserName, // IPAddress = NetHelper.Ip, // RequestURL = timedTaskEx.RequestURL, // RequestMethod = timedTaskEx.RequestMethod, // Json = timedTaskEx + "\n" + timedTaskEx.InnerException?.StackTrace + "\n" + timedTaskEx?.TargetSite?.GetParameters().ToString(), // //PlatForm = string.Format("{0}-{1}", userAgent.OS.ToString(), userAgent.RawValue), // CreatorTime = DateTime.Now // })); // } // }); // await TaskDelay(timeType, interval); // } // }, ct, TaskCreationOptions.None, new CustomerTaskScheduler()); return Task.Run(async () => { while (!ct.IsCancellationRequested) { await action(ct).Catch(async ex => { if (ex is TimedTaskException timedTaskEx and not null) { await _eventPublisher.PublishAsync(new LogEventSource("Log:CreateExLog", timedTaskEx.options!, new SysLogEntity { Id = SnowflakeIdHelper.NextId(), Category = 4, UserId = timedTaskEx.UserId, UserName = timedTaskEx.UserName, IPAddress = NetHelper.Ip, RequestURL = timedTaskEx.RequestURL, RequestMethod = timedTaskEx.RequestMethod, Json = timedTaskEx + "\n" + timedTaskEx.InnerException?.StackTrace + "\n" + timedTaskEx?.TargetSite?.GetParameters().ToString(), //PlatForm = string.Format("{0}-{1}", userAgent.OS.ToString(), userAgent.RawValue), CreatorTime = DateTime.Now })); } }); await TaskDelay(timeType, interval); } }, ct); } public override Task StopAsync(CancellationToken cancellationToken) { IsStarted = false; return Task.CompletedTask; } private Task TaskDelay(TimeSpanUnit timeType, int interval) { Task delayTask = timeType switch { TimeSpanUnit.Milliseconds => Task.Delay(TimeSpan.FromMilliseconds(interval)), TimeSpanUnit.Seconds => Task.Delay(TimeSpan.FromSeconds(interval)), TimeSpanUnit.Minutes => Task.Delay(TimeSpan.FromMinutes(interval)), TimeSpanUnit.Hours => Task.Delay(TimeSpan.FromHours(interval)), TimeSpanUnit.Days => Task.Delay(TimeSpan.FromDays(interval)), _ => throw new NotImplementedException() }; return delayTask; } public Task CloseAgvHeartbeat(IEnumerable devNames) { s_heartbeatDevNames.RemoveAll(x => devNames.Contains(x)); return Task.CompletedTask; } public Task OpenAgvHeartbeat(IEnumerable devNames) { s_heartbeatDevNames.AddRange(devNames); return Task.CompletedTask; } } /// /// 自定义任务调度器,保证长任务在单独的线程中运行 /// internal class CustomerTaskScheduler : TaskScheduler { // 这边的 BlockingCollection 只是举个例子,如果是普通的队列,配合锁也是可以的。 private readonly BlockingCollection _tasks = new(); public CustomerTaskScheduler() { Thread thread = new(() => { foreach (Task task in _tasks.GetConsumingEnumerable()) { _ = TryExecuteTask(task); } }) { IsBackground = true }; thread.Start(); } protected override IEnumerable GetScheduledTasks() { return _tasks; } protected override void QueueTask(Task task) { _tasks.Add(task); } protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } } }