Files
tnb.server/taskschedule/Tnb.TaskScheduler/TimeTaskService.cs
2023-05-31 10:19:05 +08:00

453 lines
18 KiB
C#
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
using System.Reflection;
using JNPF.Common.Configuration;
using JNPF.Common.Const;
using JNPF.Common.Core.Manager;
using JNPF.Common.Enums;
using JNPF.Common.Extension;
using JNPF.Common.Filter;
using JNPF.Common.Manager;
using JNPF.Common.Security;
using JNPF.DataEncryption;
using JNPF.DependencyInjection;
using JNPF.DynamicApiController;
using JNPF.FriendlyException;
using JNPF.LinqBuilder;
using JNPF.Schedule;
using JNPF.Systems.Interfaces.System;
using JNPF.TaskScheduler.Entitys;
using JNPF.TaskScheduler.Entitys.Dto.TaskScheduler;
using JNPF.TaskScheduler.Entitys.Model;
using JNPF.TaskScheduler.Interfaces.TaskScheduler;
using JNPF.TimeCrontab;
using Mapster;
using Microsoft.AspNetCore.Mvc;
using SqlSugar;
namespace JNPF.TaskScheduler;
/// <summary>
/// 定时任务
/// 版 本V3.4.1
/// 版 权拓通智联科技有限公司http://www.tuotong-tech.com
/// 日 期2021-06-01.
/// </summary>
[ApiDescriptionSettings(Tag = "TaskScheduler", Name = "scheduletask", Order = 220)]
[Route("api/[controller]")]
public class TimeTaskService : ITimeTaskService, IDynamicApiController, ITransient
{
private readonly ISqlSugarRepository<TimeTaskEntity> _repository;
private readonly IDataInterfaceService _dataInterfaceService;
private readonly IUserManager _userManager;
private readonly ICacheManager _cacheManager;
//private readonly ISchedulerFactory _schedulerFactory;
private readonly IDataBaseManager _dataBaseManager;
/// <summary>
/// 初始化一个<see cref="TimeTaskService"/>类型的新实例.
/// </summary>
public TimeTaskService(
ISqlSugarRepository<TimeTaskEntity> repository,
IUserManager userManager,
IDataInterfaceService dataInterfaceService,
ICacheManager cacheManager,
//ISchedulerFactory schedulerFactory,
IDataBaseManager dataBaseManager)
{
_repository = repository;
_userManager = userManager;
_dataInterfaceService = dataInterfaceService;
_cacheManager = cacheManager;
_dataBaseManager = dataBaseManager;
//_schedulerFactory = schedulerFactory;
}
#region Get
/// <summary>
/// 列表.
/// </summary>
/// <param name="input">请求参数</param>
/// <returns></returns>
[HttpGet("")]
public async Task<dynamic> GetList([FromQuery] PageInputBase input)
{
var queryWhere = LinqExpression.And<TimeTaskEntity>().And(x => x.DeleteMark == null);
if (!string.IsNullOrEmpty(input.keyword))
queryWhere = queryWhere.And(m => m.FullName.Contains(input.keyword) || m.EnCode.Contains(input.keyword));
var list = await _repository.AsQueryable().Where(queryWhere).OrderBy(x => x.CreatorTime, OrderByType.Desc)
.OrderByIF(!string.IsNullOrEmpty(input.keyword), t => t.LastModifyTime, OrderByType.Desc).ToPagedListAsync(input.currentPage, input.pageSize);
var pageList = new SqlSugarPagedList<TimeTaskListOutput>()
{
list = list.list.Adapt<List<TimeTaskListOutput>>(),
pagination = list.pagination
};
return PageResult<TimeTaskListOutput>.SqlSugarPageResult(pageList);
}
/// <summary>
/// 列表(执行记录).
/// </summary>
/// <param name="input">请求参数.</param>
/// <param name="id">任务Id.</param>
/// <returns></returns>
[HttpGet("{id}/TaskLog")]
public async Task<dynamic> GetTaskLogList([FromQuery] TaskLogInput input, string id)
{
var whereLambda = LinqExpression.And<TimeTaskLogEntity>().And(x => x.TaskId == id);
if (input.runResult.IsNotEmptyOrNull())
whereLambda = whereLambda.And(x => x.RunResult == input.runResult);
if (input.endTime != null && input.startTime != null)
{
var start = Convert.ToDateTime(string.Format("{0:yyyy-MM-dd 00:00:00}", input.startTime?.TimeStampToDateTime()));
var end = Convert.ToDateTime(string.Format("{0:yyyy-MM-dd 23:59:59}", input.endTime?.TimeStampToDateTime()));
whereLambda = whereLambda.And(x => SqlFunc.Between(x.RunTime, start, end));
}
var list = await _repository.AsSugarClient().Queryable<TimeTaskLogEntity>().Where(whereLambda).OrderBy(x => x.RunTime, OrderByType.Desc).ToPagedListAsync(input.currentPage, input.pageSize);
var pageList = new SqlSugarPagedList<TimeTaskTaskLogListOutput>()
{
list = list.list.Adapt<List<TimeTaskTaskLogListOutput>>(),
pagination = list.pagination
};
return PageResult<TimeTaskTaskLogListOutput>.SqlSugarPageResult(pageList);
}
/// <summary>
/// 信息.
/// </summary>
/// <param name="id">主键值.</param>
/// <returns></returns>
[HttpGet("Info/{id}")]
public async Task<dynamic> GetInfo_Api(string id)
{
return (await GetInfo(id)).Adapt<TimeTaskInfoOutput>();
}
/// <summary>
/// 本地方法.
/// </summary>
/// <returns></returns>
[HttpGet("TaskMethods")]
public async Task<dynamic> GetTaskMethodSelector()
{
return await GetTaskMethods();
}
#endregion
#region Post
/// <summary>
/// 新建.
/// </summary>
/// <param name="input">实体对象.</param>
/// <returns></returns>
[HttpPost("")]
public async Task Create([FromBody] TimeTaskCrInput input)
{
if (await _repository.IsAnyAsync(x => (x.EnCode == input.enCode || x.FullName == input.fullName) && x.DeleteMark == null))
throw Oops.Oh(ErrorCode.COM1004);
var comtentModel = input.executeContent.ToObject<ContentModel>();
comtentModel.TenantId = _userManager.TenantId;
comtentModel.TenantDbName = _userManager.TenantDbName;
comtentModel.ConnectionConfig = _userManager.ConnectionConfig;
comtentModel.Token = _userManager.ToKen;
var entity = input.Adapt<TimeTaskEntity>();
entity.ExecuteContent = comtentModel.ToJsonString();
entity.ExecuteCycleJson = comtentModel.cron;
var result = await _repository.AsInsertable(entity).IgnoreColumns(ignoreNullColumn: true).CallEntityMethod(m => m.Create()).ExecuteReturnEntityAsync();
// var job = _repository.AsTenant().GetConnection("JNPF-Job");
// var jobDetail = new JobDetail() { };
// job.Insertable()
_ = result ?? throw Oops.Oh(ErrorCode.COM1000);
// 添加到任务调度里
AddTimerJob(result);
//await AddJob(result);
}
/// <summary>
/// 更新.
/// </summary>
/// <param name="id">主键值</param>
/// <param name="input">实体对象</param>
/// <returns></returns>
[HttpPut("{id}")]
public async Task Update(string id, [FromBody] TimeTaskUpInput input)
{
if (await _repository.IsAnyAsync(x => x.Id != id && (x.EnCode == input.enCode || x.FullName == input.fullName) && x.DeleteMark == null))
throw Oops.Oh(ErrorCode.COM1004);
var entityOld = await GetInfo(id);
// 先从调度器里取消
SpareTime.Cancel(id);
var entityNew = input.Adapt<TimeTaskEntity>();
entityNew.RunCount = entityOld.RunCount;
var comtentModel = input.executeContent.ToObject<ContentModel>();
comtentModel.TenantId = _userManager.TenantId;
comtentModel.TenantDbName = _userManager.TenantDbName;
comtentModel.ConnectionConfig = _userManager.ConnectionConfig;
comtentModel.Token = _userManager.ToKen;
entityNew.ExecuteContent = comtentModel.ToJsonString();
entityNew.ExecuteCycleJson = comtentModel.cron;
var isOk = await _repository.AsUpdateable(entityNew).IgnoreColumns(ignoreAllNullColumns: true).CallEntityMethod(m => m.LastModify()).ExecuteCommandHasChangeAsync();
if (!isOk)
throw Oops.Oh(ErrorCode.COM1001);
// 再添加到任务调度里
if (entityNew.EnabledMark == 1)
{
AddTimerJob(entityNew);
}
}
/// <summary>
/// 删除.
/// </summary>
/// <param name="id">主键值.</param>
/// <returns></returns>
[HttpDelete("{id}")]
public async Task Delete(string id)
{
var entity = await GetInfo(id);
if (entity == null)
throw Oops.Oh(ErrorCode.COM1005);
var isOk = await _repository.AsUpdateable(entity).CallEntityMethod(m => m.Delete()).UpdateColumns(it => new { it.DeleteMark, it.DeleteTime, it.DeleteUserId }).ExecuteCommandHasChangeAsync();
if (!isOk)
throw Oops.Oh(ErrorCode.COM1002);
// 从调度器里取消
SpareTime.Cancel(entity.Id);
}
/// <summary>
/// 停止.
/// </summary>
/// <param name="id">主键值.</param>
/// <returns></returns>
[HttpPut("{id}/Actions/Stop")]
public async Task Stop(string id)
{
var isOk = await _repository.AsUpdateable().SetColumns(it => new TimeTaskEntity()
{
EnabledMark = SqlFunc.IIF(it.EnabledMark == 1, 0, 1),
LastModifyUserId = _userManager.UserId,
LastModifyTime = SqlFunc.GetDate()
}).Where(it => it.Id.Equals(id)).ExecuteCommandHasChangeAsync();
if (!isOk)
throw Oops.Oh(ErrorCode.COM1003);
SpareTime.Stop(id);
}
/// <summary>
/// 启动.
/// </summary>
/// <param name="id">主键值.</param>
/// <returns></returns>
[HttpPut("{id}/Actions/Enable")]
public async Task Enable(string id)
{
var entity = await GetInfo(id);
var isOk = await _repository.AsUpdateable().SetColumns(it => new TimeTaskEntity()
{
EnabledMark = SqlFunc.IIF(it.EnabledMark == 1, 0, 1),
LastModifyUserId = _userManager.UserId,
LastModifyTime = SqlFunc.GetDate()
}).Where(it => it.Id.Equals(id)).ExecuteCommandHasChangeAsync();
if (!isOk)
throw Oops.Oh(ErrorCode.COM1003);
var comtentModel = entity.ExecuteContent.ToObject<ContentModel>();
comtentModel.Token = _userManager.ToKen;
entity.ExecuteContent = comtentModel.ToJsonString();
var timer = SpareTime.GetWorkers().ToList().Find(u => u.WorkerName == id);
if (timer == null)
{
AddTimerJob(entity);
}
else
{
// 如果 StartNow 为 flase , 执行 AddTimerJob 并不会启动任务
SpareTime.Start(entity.Id);
}
}
#endregion
#region PublicMethod
/// <summary>
/// 启动自启动任务.
/// </summary>
[NonAction]
public void StartTimerJob()
{
// 非多租户模式启动自启任务
if (!KeyVariable.MultiTenancy)
{
//_repository.AsQueryable().Where(x => x.DeleteMark == null && x.EnabledMark == 1).ToList().ForEach(AddTimerJob);
}
}
/// <summary>
/// 根据类型执行任务.
/// </summary>
/// <param name="entity">任务实体.</param>
/// <returns></returns>
[NonAction]
public async Task<string> PerformJob(TimeTaskEntity entity)
{
try
{
var model = entity.ExecuteContent.ToObject<ContentModel>();
if (!string.IsNullOrEmpty(model.Token))
{
var claims = JWTEncryption.ReadJwtToken(model.Token.Replace("Bearer ", string.Empty).Replace("bearer ", string.Empty))?.Claims;
var connectionConfig = claims.FirstOrDefault(e => e.Type == ClaimConst.CONNECTIONCONFIG)?.Value.ToObject<ConnectionConfigOptions>();
var parameters = model.parameter.ToDictionary(key => key.field, value => value.value.IsNotEmptyOrNull() ? value.value : value.defaultValue);
await _dataInterfaceService.GetResponseByType(model.interfaceId, 3, model.TenantId, null, parameters);
}
return string.Empty;
}
catch (Exception ex)
{
return ex.Message;
}
}
#endregion
#region PrivateMethod
/// <summary>
/// 详情.
/// </summary>
/// <param name="id">主键.</param>
/// <returns></returns>
private async Task<TimeTaskEntity> GetInfo(string id)
{
return await _repository.GetFirstAsync(x => x.Id == id && x.DeleteMark == null);
}
/// <summary>
/// 新增定时任务.
/// </summary>
/// <param name="input"></param>
private async void AddTimerJob(TimeTaskEntity input)
{
Action<SpareTimer, long>? action = null;
ContentModel? comtentModel = input.ExecuteContent.ToObject<ContentModel>();
input.ExecuteCycleJson = comtentModel.cron;
switch (input.ExecuteType)
{
case "3":
// 查询符合条件的任务方法
TaskMethodInfo? taskMethod = GetTaskMethods()?.Result.FirstOrDefault(m => m.id == comtentModel.localHostTaskId);
if (taskMethod == null) break;
// 创建任务对象
object? typeInstance = Activator.CreateInstance(taskMethod.DeclaringType);
// 创建委托
action = (Action<SpareTimer, long>)Delegate.CreateDelegate(typeof(Action<SpareTimer, long>), typeInstance, taskMethod.MethodName);
break;
default:
action = async (timer, count) =>
{
var msg = await PerformJob(input);
};
break;
}
if (action == null) return;
// SpareTime.Do(comtentModel.cron, action, input.Id, comtentModel.ConnectionConfig.ToJsonString(), true, executeType: SpareTimeExecuteTypes.Parallel);
var starTime = comtentModel.startTime?.TimeStampToDateTime();
var endTime = comtentModel.endTime?.TimeStampToDateTime();
var interval = 1;
if (DateTime.Now < starTime)
{
interval = (starTime.ParseToDateTime() - DateTime.Now).TotalMilliseconds.ParseToInt();
}
SpareTime.DoOnce(interval, action, "Once_" + input.Id);
SpareTime.Do(
() =>
{
var isRun = comtentModel.endTime.IsNullOrEmpty() ? DateTime.Now >= starTime : DateTime.Now >= starTime && DateTime.Now < endTime;
if (isRun)
{
return SpareTime.GetCronNextOccurrence(comtentModel.cron);
}
else
{
return null;
}
},
action, input.Id, comtentModel.ConnectionConfig.ToJsonString(), true, executeType: SpareTimeExecuteTypes.Parallel, cancelInNoneNextTime: false);
}
private async Task AddJob(TimeTaskEntity input)
{
//ContentModel? comtentModel = input.ExecuteContent.ToObject<ContentModel>();
//var starTime = comtentModel.startTime?.TimeStampToDateTime();
//var endTime = comtentModel.endTime?.TimeStampToDateTime();
//var jobBuilder = JobBuilder.Create(async (context, stoppingToken) =>
//{
// Console.WriteLine(string.Format("{0}在执行任务,执行时间{1},触发时间:{1}", context.JobId, context.ExecutingTime, context.OccurrenceTime));
// if (!input.ExecuteType.Equals("3"))
// {
// var msg = await PerformJob(input);
// }
//});
//jobBuilder.SetJobId(input.Id);
//jobBuilder.SetGroupName(input.FullName);
//var triggerBuilder = TriggerBuilder.Create<CronTrigger>(input.ExecuteCycleJson, CronStringFormat.WithSeconds);
//triggerBuilder.SetStartTime(starTime);
//triggerBuilder.SetEndTime(endTime);
////triggerBuilder.SetRunOnStart(true);
//_schedulerFactory.AddJob(jobBuilder, triggerBuilder);
}
/// <summary>
/// 获取所有本地任务.
/// </summary>
/// <returns></returns>
private async Task<List<TaskMethodInfo>> GetTaskMethods()
{
//var taskMethods = await _cacheManager.GetAsync<List<TaskMethodInfo>>(CommonConst.CACHEKEYTIMERJOB);
//if (taskMethods != null) return taskMethods;
//// 获取所有本地任务方法必须有spareTimeAttribute特性
//taskMethods = App.EffectiveTypes
// .Where(u => u.IsClass && !u.IsInterface && !u.IsAbstract && typeof(ISpareTimeWorker).IsAssignableFrom(u))
// .SelectMany(u => u.GetMethods(BindingFlags.Public | BindingFlags.Instance)
// .Where(m => m.IsDefined(typeof(SpareTimeAttribute), false) &&
// m.GetParameters().Length == 2 &&
// m.GetParameters()[0].ParameterType == typeof(SpareTimer) &&
// m.GetParameters()[1].ParameterType == typeof(long) && m.ReturnType == typeof(void))
// .Select(m =>
// {
// // 默认获取第一条任务特性
// var spareTimeAttribute = m.GetCustomAttribute<SpareTimeAttribute>();
// return new TaskMethodInfo
// {
// id = $"{m.DeclaringType.Name}/{m.Name}",
// fullName = spareTimeAttribute.WorkerName,
// RequestUrl = $"{m.DeclaringType.Name}/{m.Name}",
// cron = spareTimeAttribute.CronExpression,
// DoOnce = spareTimeAttribute.DoOnce,
// ExecuteType = spareTimeAttribute.ExecuteType,
// Interval = (int)spareTimeAttribute.Interval / 1000,
// StartNow = spareTimeAttribute.StartNow,
// RequestType = RequestTypeEnum.BuiltIn,
// Remark = spareTimeAttribute.Description,
// TimerType = string.IsNullOrEmpty(spareTimeAttribute.CronExpression) ? SpareTimeTypes.Interval : SpareTimeTypes.Cron,
// MethodName = m.Name,
// DeclaringType = m.DeclaringType
// };
// })).ToList();
//await _cacheManager.SetAsync(CommonConst.CACHEKEYTIMERJOB, taskMethods);
//return taskMethods;
return new List<TaskMethodInfo>();
}
#endregion
}