455 lines
18 KiB
C#
455 lines
18 KiB
C#
using System.Reflection;
|
||
using JNPF.Common.Configuration;
|
||
using JNPF.Common.Const;
|
||
using JNPF.Common.Core.Manager;
|
||
using JNPF.Common.Enums;
|
||
using JNPF.Common.Extension;
|
||
using JNPF.Common.Filter;
|
||
using JNPF.Common.Manager;
|
||
using JNPF.Common.Security;
|
||
using JNPF.DataEncryption;
|
||
using JNPF.DependencyInjection;
|
||
using JNPF.DynamicApiController;
|
||
using JNPF.FriendlyException;
|
||
using JNPF.LinqBuilder;
|
||
using JNPF.Schedule;
|
||
using JNPF.Systems.Interfaces.System;
|
||
using JNPF.TaskScheduler.Entitys;
|
||
using JNPF.TaskScheduler.Entitys.Dto.TaskScheduler;
|
||
using JNPF.TaskScheduler.Entitys.Enum;
|
||
using JNPF.TaskScheduler.Entitys.Model;
|
||
using JNPF.TaskScheduler.Interfaces.TaskScheduler;
|
||
using JNPF.TimeCrontab;
|
||
using Mapster;
|
||
using Microsoft.AspNetCore.Mvc;
|
||
using SqlSugar;
|
||
|
||
namespace JNPF.TaskScheduler;
|
||
|
||
/// <summary>
|
||
/// 定时任务
|
||
/// 版 本:V3.4.1
|
||
/// 版 权:拓通智联科技有限公司(http://www.tuotong-tech.com)
|
||
/// 日 期:2021-06-01.
|
||
/// </summary>
|
||
[ApiDescriptionSettings(Tag = "TaskScheduler", Name = "scheduletask", Order = 220)]
|
||
[Route("api/[controller]")]
|
||
public class TimeTaskService : ITimeTaskService, IDynamicApiController, ITransient
|
||
{
|
||
private readonly ISqlSugarRepository<TimeTaskEntity> _repository;
|
||
private readonly IDataInterfaceService _dataInterfaceService;
|
||
private readonly IUserManager _userManager;
|
||
private readonly ICacheManager _cacheManager;
|
||
//private readonly ISchedulerFactory _schedulerFactory;
|
||
private readonly IDataBaseManager _dataBaseManager;
|
||
|
||
/// <summary>
|
||
/// 初始化一个<see cref="TimeTaskService"/>类型的新实例.
|
||
/// </summary>
|
||
public TimeTaskService(
|
||
ISqlSugarRepository<TimeTaskEntity> repository,
|
||
IUserManager userManager,
|
||
IDataInterfaceService dataInterfaceService,
|
||
ICacheManager cacheManager,
|
||
//ISchedulerFactory schedulerFactory,
|
||
IDataBaseManager dataBaseManager)
|
||
{
|
||
_repository = repository;
|
||
_userManager = userManager;
|
||
_dataInterfaceService = dataInterfaceService;
|
||
_cacheManager = cacheManager;
|
||
_dataBaseManager = dataBaseManager;
|
||
//_schedulerFactory = schedulerFactory;
|
||
}
|
||
|
||
#region Get
|
||
|
||
/// <summary>
|
||
/// 列表.
|
||
/// </summary>
|
||
/// <param name="input">请求参数</param>
|
||
/// <returns></returns>
|
||
[HttpGet("")]
|
||
public async Task<dynamic> GetList([FromQuery] PageInputBase input)
|
||
{
|
||
var queryWhere = LinqExpression.And<TimeTaskEntity>().And(x => x.DeleteMark == null);
|
||
if (!string.IsNullOrEmpty(input.keyword))
|
||
queryWhere = queryWhere.And(m => m.FullName.Contains(input.keyword) || m.EnCode.Contains(input.keyword));
|
||
var list = await _repository.AsQueryable().Where(queryWhere).OrderBy(x => x.CreatorTime, OrderByType.Desc)
|
||
.OrderByIF(!string.IsNullOrEmpty(input.keyword), t => t.LastModifyTime, OrderByType.Desc).ToPagedListAsync(input.currentPage, input.pageSize);
|
||
var pageList = new SqlSugarPagedList<TimeTaskListOutput>()
|
||
{
|
||
list = list.list.Adapt<List<TimeTaskListOutput>>(),
|
||
pagination = list.pagination
|
||
};
|
||
return PageResult<TimeTaskListOutput>.SqlSugarPageResult(pageList);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 列表(执行记录).
|
||
/// </summary>
|
||
/// <param name="input">请求参数.</param>
|
||
/// <param name="id">任务Id.</param>
|
||
/// <returns></returns>
|
||
[HttpGet("{id}/TaskLog")]
|
||
public async Task<dynamic> GetTaskLogList([FromQuery] TaskLogInput input, string id)
|
||
{
|
||
var whereLambda = LinqExpression.And<TimeTaskLogEntity>().And(x => x.TaskId == id);
|
||
if (input.runResult.IsNotEmptyOrNull())
|
||
whereLambda = whereLambda.And(x => x.RunResult == input.runResult);
|
||
if (input.endTime != null && input.startTime != null)
|
||
{
|
||
var start = Convert.ToDateTime(string.Format("{0:yyyy-MM-dd 00:00:00}", input.startTime?.TimeStampToDateTime()));
|
||
var end = Convert.ToDateTime(string.Format("{0:yyyy-MM-dd 23:59:59}", input.endTime?.TimeStampToDateTime()));
|
||
whereLambda = whereLambda.And(x => SqlFunc.Between(x.RunTime, start, end));
|
||
}
|
||
var list = await _repository.AsSugarClient().Queryable<TimeTaskLogEntity>().Where(whereLambda).OrderBy(x => x.RunTime, OrderByType.Desc).ToPagedListAsync(input.currentPage, input.pageSize);
|
||
var pageList = new SqlSugarPagedList<TimeTaskTaskLogListOutput>()
|
||
{
|
||
list = list.list.Adapt<List<TimeTaskTaskLogListOutput>>(),
|
||
pagination = list.pagination
|
||
};
|
||
return PageResult<TimeTaskTaskLogListOutput>.SqlSugarPageResult(pageList);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 信息.
|
||
/// </summary>
|
||
/// <param name="id">主键值.</param>
|
||
/// <returns></returns>
|
||
[HttpGet("Info/{id}")]
|
||
public async Task<dynamic> GetInfo_Api(string id)
|
||
{
|
||
return (await GetInfo(id)).Adapt<TimeTaskInfoOutput>();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 本地方法.
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
[HttpGet("TaskMethods")]
|
||
public async Task<dynamic> GetTaskMethodSelector()
|
||
{
|
||
return await GetTaskMethods();
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region Post
|
||
|
||
/// <summary>
|
||
/// 新建.
|
||
/// </summary>
|
||
/// <param name="input">实体对象.</param>
|
||
/// <returns></returns>
|
||
[HttpPost("")]
|
||
public async Task Create([FromBody] TimeTaskCrInput input,bool startNow = true)
|
||
{
|
||
if (await _repository.IsAnyAsync(x => (x.EnCode == input.enCode || x.FullName == input.fullName) && x.DeleteMark == null))
|
||
throw Oops.Oh(ErrorCode.COM1004);
|
||
var comtentModel = input.executeContent.ToObject<ContentModel>();
|
||
comtentModel.TenantId = _userManager.TenantId;
|
||
comtentModel.TenantDbName = _userManager.TenantDbName;
|
||
comtentModel.ConnectionConfig = _userManager.ConnectionConfig;
|
||
comtentModel.Token = _userManager.ToKen;
|
||
var entity = input.Adapt<TimeTaskEntity>();
|
||
entity.ExecuteContent = comtentModel.ToJsonString();
|
||
entity.ExecuteCycleJson = comtentModel.cron;
|
||
var result = await _repository.AsInsertable(entity).IgnoreColumns(ignoreNullColumn: true).CallEntityMethod(m => m.Create()).ExecuteReturnEntityAsync();
|
||
// var job = _repository.AsTenant().GetConnection("JNPF-Job");
|
||
// var jobDetail = new JobDetail() { };
|
||
// job.Insertable()
|
||
_ = result ?? throw Oops.Oh(ErrorCode.COM1000);
|
||
|
||
// 添加到任务调度里
|
||
AddTimerJob(result,startNow);
|
||
//await AddJob(result);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 更新.
|
||
/// </summary>
|
||
/// <param name="id">主键值</param>
|
||
/// <param name="input">实体对象</param>
|
||
/// <returns></returns>
|
||
[HttpPut("{id}")]
|
||
public async Task Update(string id, [FromBody] TimeTaskUpInput input)
|
||
{
|
||
if (await _repository.IsAnyAsync(x => x.Id != id && (x.EnCode == input.enCode || x.FullName == input.fullName) && x.DeleteMark == null))
|
||
throw Oops.Oh(ErrorCode.COM1004);
|
||
var entityOld = await GetInfo(id);
|
||
|
||
// 先从调度器里取消
|
||
SpareTime.Cancel(id);
|
||
var entityNew = input.Adapt<TimeTaskEntity>();
|
||
entityNew.RunCount = entityOld.RunCount;
|
||
var comtentModel = input.executeContent.ToObject<ContentModel>();
|
||
comtentModel.TenantId = _userManager.TenantId;
|
||
comtentModel.TenantDbName = _userManager.TenantDbName;
|
||
comtentModel.ConnectionConfig = _userManager.ConnectionConfig;
|
||
comtentModel.Token = _userManager.ToKen;
|
||
entityNew.ExecuteContent = comtentModel.ToJsonString();
|
||
entityNew.ExecuteCycleJson = comtentModel.cron;
|
||
var isOk = await _repository.AsUpdateable(entityNew).IgnoreColumns(ignoreAllNullColumns: true).CallEntityMethod(m => m.LastModify()).ExecuteCommandHasChangeAsync();
|
||
if (!isOk)
|
||
throw Oops.Oh(ErrorCode.COM1001);
|
||
|
||
// 再添加到任务调度里
|
||
if (entityNew.EnabledMark == 1)
|
||
{
|
||
AddTimerJob(entityNew);
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 删除.
|
||
/// </summary>
|
||
/// <param name="id">主键值.</param>
|
||
/// <returns></returns>
|
||
[HttpDelete("{id}")]
|
||
public async Task Delete(string id)
|
||
{
|
||
var entity = await GetInfo(id);
|
||
if (entity == null)
|
||
throw Oops.Oh(ErrorCode.COM1005);
|
||
var isOk = await _repository.AsUpdateable(entity).CallEntityMethod(m => m.Delete()).UpdateColumns(it => new { it.DeleteMark, it.DeleteTime, it.DeleteUserId }).ExecuteCommandHasChangeAsync();
|
||
if (!isOk)
|
||
throw Oops.Oh(ErrorCode.COM1002);
|
||
// 从调度器里取消
|
||
SpareTime.Cancel(entity.Id);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 停止.
|
||
/// </summary>
|
||
/// <param name="id">主键值.</param>
|
||
/// <returns></returns>
|
||
[HttpPut("{id}/Actions/Stop")]
|
||
public async Task Stop(string id)
|
||
{
|
||
var isOk = await _repository.AsUpdateable().SetColumns(it => new TimeTaskEntity()
|
||
{
|
||
EnabledMark = SqlFunc.IIF(it.EnabledMark == 1, 0, 1),
|
||
LastModifyUserId = _userManager.UserId,
|
||
LastModifyTime = SqlFunc.GetDate()
|
||
}).Where(it => it.Id.Equals(id)).ExecuteCommandHasChangeAsync();
|
||
if (!isOk)
|
||
throw Oops.Oh(ErrorCode.COM1003);
|
||
SpareTime.Stop(id);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 启动.
|
||
/// </summary>
|
||
/// <param name="id">主键值.</param>
|
||
/// <returns></returns>
|
||
[HttpPut("{id}/Actions/Enable")]
|
||
public async Task Enable(string id)
|
||
{
|
||
var entity = await GetInfo(id);
|
||
var isOk = await _repository.AsUpdateable().SetColumns(it => new TimeTaskEntity()
|
||
{
|
||
EnabledMark = SqlFunc.IIF(it.EnabledMark == 1, 0, 1),
|
||
LastModifyUserId = _userManager.UserId,
|
||
LastModifyTime = SqlFunc.GetDate()
|
||
}).Where(it => it.Id.Equals(id)).ExecuteCommandHasChangeAsync();
|
||
if (!isOk)
|
||
throw Oops.Oh(ErrorCode.COM1003);
|
||
|
||
var comtentModel = entity.ExecuteContent.ToObject<ContentModel>();
|
||
comtentModel.Token = _userManager.ToKen;
|
||
entity.ExecuteContent = comtentModel.ToJsonString();
|
||
var timer = SpareTime.GetWorkers().ToList().Find(u => u.WorkerName == id);
|
||
if (timer == null)
|
||
{
|
||
AddTimerJob(entity);
|
||
}
|
||
else
|
||
{
|
||
// 如果 StartNow 为 flase , 执行 AddTimerJob 并不会启动任务
|
||
SpareTime.Start(entity.Id);
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region PublicMethod
|
||
|
||
/// <summary>
|
||
/// 启动自启动任务.
|
||
/// </summary>
|
||
[NonAction]
|
||
public void StartTimerJob()
|
||
{
|
||
// 非多租户模式启动自启任务
|
||
if (!KeyVariable.MultiTenancy)
|
||
{
|
||
_repository.AsQueryable().Where(x => x.DeleteMark == null && x.EnabledMark == 1).ToList().ForEach(x=>AddTimerJob(x,false));//modifyby zhoukeda 20230607
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// 根据类型执行任务.
|
||
/// </summary>
|
||
/// <param name="entity">任务实体.</param>
|
||
/// <returns></returns>
|
||
[NonAction]
|
||
public async Task<string> PerformJob(TimeTaskEntity entity)
|
||
{
|
||
try
|
||
{
|
||
var model = entity.ExecuteContent.ToObject<ContentModel>();
|
||
if (!string.IsNullOrEmpty(model.Token))
|
||
{
|
||
var claims = JWTEncryption.ReadJwtToken(model.Token.Replace("Bearer ", string.Empty).Replace("bearer ", string.Empty))?.Claims;
|
||
var connectionConfig = claims.FirstOrDefault(e => e.Type == ClaimConst.CONNECTIONCONFIG)?.Value.ToObject<ConnectionConfigOptions>();
|
||
var parameters = model.parameter.ToDictionary(key => key.field, value => value.value.IsNotEmptyOrNull() ? value.value : value.defaultValue);
|
||
await _dataInterfaceService.GetResponseByType(model.interfaceId, 3, model.TenantId, null, parameters);
|
||
}
|
||
return string.Empty;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
return ex.Message;
|
||
}
|
||
}
|
||
|
||
#endregion
|
||
|
||
#region PrivateMethod
|
||
|
||
/// <summary>
|
||
/// 详情.
|
||
/// </summary>
|
||
/// <param name="id">主键.</param>
|
||
/// <returns></returns>
|
||
private async Task<TimeTaskEntity> GetInfo(string id)
|
||
{
|
||
return await _repository.GetFirstAsync(x => x.Id == id && x.DeleteMark == null);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 新增定时任务.
|
||
/// </summary>
|
||
/// <param name="input"></param>
|
||
private async void AddTimerJob(TimeTaskEntity input,bool startNow = true)
|
||
{
|
||
Action<SpareTimer, long>? action = null;
|
||
ContentModel? comtentModel = input.ExecuteContent.ToObject<ContentModel>();
|
||
input.ExecuteCycleJson = comtentModel.cron;
|
||
TaskMethodInfo? taskMethod = null;
|
||
switch (input.ExecuteType)
|
||
{
|
||
case "3":
|
||
// 查询符合条件的任务方法
|
||
taskMethod = GetTaskMethods()?.Result.FirstOrDefault(m => m.id == comtentModel.localHostTaskId);
|
||
if (taskMethod == null) break;
|
||
|
||
// 创建任务对象
|
||
object? typeInstance = Activator.CreateInstance(taskMethod.DeclaringType);
|
||
|
||
// 创建委托
|
||
action = (Action<SpareTimer, long>)Delegate.CreateDelegate(typeof(Action<SpareTimer, long>), typeInstance, taskMethod.MethodName);
|
||
break;
|
||
default:
|
||
action = async (timer, count) =>
|
||
{
|
||
var msg = await PerformJob(input);
|
||
};
|
||
break;
|
||
}
|
||
|
||
if (action == null) return;
|
||
|
||
// SpareTime.Do(comtentModel.cron, action, input.Id, comtentModel.ConnectionConfig.ToJsonString(), true, executeType: SpareTimeExecuteTypes.Parallel);
|
||
var starTime = comtentModel.startTime?.TimeStampToDateTime();
|
||
var endTime = comtentModel.endTime?.TimeStampToDateTime();
|
||
var interval = 1;
|
||
if (DateTime.Now < starTime)
|
||
{
|
||
interval = (starTime.ParseToDateTime() - DateTime.Now).TotalMilliseconds.ParseToInt();
|
||
}
|
||
if (startNow) //modifyby zhoukeda 20230516
|
||
{
|
||
SpareTime.DoOnce(interval, action, "Once_" + input.Id);
|
||
}
|
||
|
||
Func<DateTimeOffset?> nextHandle = null;
|
||
var isRun = comtentModel.endTime.IsNullOrEmpty() ? DateTime.Now >= starTime : DateTime.Now >= starTime && DateTime.Now < endTime;
|
||
if (isRun)
|
||
{
|
||
nextHandle = ()=>SpareTime.GetCronNextOccurrence(comtentModel.cron);
|
||
}
|
||
SpareTime.Do(nextHandle
|
||
,
|
||
action, input.Id, comtentModel.ConnectionConfig.ToJsonString(), true, executeType: SpareTimeExecuteTypes.Parallel, cancelInNoneNextTime: false);
|
||
}
|
||
|
||
private async Task AddJob(TimeTaskEntity input)
|
||
{
|
||
//ContentModel? comtentModel = input.ExecuteContent.ToObject<ContentModel>();
|
||
//var starTime = comtentModel.startTime?.TimeStampToDateTime();
|
||
//var endTime = comtentModel.endTime?.TimeStampToDateTime();
|
||
//var jobBuilder = JobBuilder.Create(async (context, stoppingToken) =>
|
||
//{
|
||
// Console.WriteLine(string.Format("{0}在执行任务,执行时间{1},触发时间:{1}", context.JobId, context.ExecutingTime, context.OccurrenceTime));
|
||
// if (!input.ExecuteType.Equals("3"))
|
||
// {
|
||
// var msg = await PerformJob(input);
|
||
// }
|
||
//});
|
||
//jobBuilder.SetJobId(input.Id);
|
||
//jobBuilder.SetGroupName(input.FullName);
|
||
//var triggerBuilder = TriggerBuilder.Create<CronTrigger>(input.ExecuteCycleJson, CronStringFormat.WithSeconds);
|
||
//triggerBuilder.SetStartTime(starTime);
|
||
//triggerBuilder.SetEndTime(endTime);
|
||
////triggerBuilder.SetRunOnStart(true);
|
||
//_schedulerFactory.AddJob(jobBuilder, triggerBuilder);
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取所有本地任务.
|
||
/// </summary>
|
||
/// <returns></returns>
|
||
private async Task<List<TaskMethodInfo>> GetTaskMethods()
|
||
{
|
||
// var taskMethods = await _cacheManager.GetAsync<List<TaskMethodInfo>>(CommonConst.CACHEKEYTIMERJOB);
|
||
// if (taskMethods != null) return taskMethods;
|
||
|
||
List<TaskMethodInfo> taskMethods = null;
|
||
|
||
// 获取所有本地任务方法,必须有spareTimeAttribute特性
|
||
taskMethods = App.EffectiveTypes
|
||
.Where(u => u.IsClass && !u.IsInterface && !u.IsAbstract && typeof(ISpareTimeWorker).IsAssignableFrom(u))
|
||
.SelectMany(u => u.GetMethods(BindingFlags.Public | BindingFlags.Instance)
|
||
.Where(m => m.GetCustomAttributes(typeof(SpareTimeAttribute), false).ToString().Contains("SpareTime") &&
|
||
m.GetParameters().Length == 2 &&
|
||
m.GetParameters()[0].ParameterType == typeof(SpareTimer) &&
|
||
m.GetParameters()[1].ParameterType == typeof(long) && m.ReturnType == typeof(void))
|
||
.Select(m =>
|
||
{
|
||
// 默认获取第一条任务特性
|
||
var spareTimeAttribute = m.GetCustomAttribute<SpareTimeAttribute>();
|
||
return new TaskMethodInfo
|
||
{
|
||
id = $"{m.DeclaringType.Name}/{m.Name}",
|
||
fullName = spareTimeAttribute.WorkerName,
|
||
RequestUrl = $"{m.DeclaringType.Name}/{m.Name}",
|
||
cron = spareTimeAttribute.CronExpression,
|
||
DoOnce = spareTimeAttribute.DoOnce,
|
||
ExecuteType = spareTimeAttribute.ExecuteType,
|
||
Interval = (int)spareTimeAttribute.Interval / 1000,
|
||
StartNow = spareTimeAttribute.StartNow,
|
||
RequestType = RequestTypeEnum.BuiltIn,
|
||
Remark = spareTimeAttribute.Description,
|
||
TimerType = string.IsNullOrEmpty(spareTimeAttribute.CronExpression) ? SpareTimeTypes.Interval : SpareTimeTypes.Cron,
|
||
MethodName = m.Name,
|
||
DeclaringType = m.DeclaringType
|
||
};
|
||
})).ToList();
|
||
await _cacheManager.SetAsync(CommonConst.CACHEKEYTIMERJOB, taskMethods);
|
||
return taskMethods;
|
||
}
|
||
|
||
#endregion
|
||
} |