定时任务
1. 概述
定时任务是梵医云系统中用于执行周期性任务的核心模块。系统提供了两种定时任务实现方式:
- XXL-Job:分布式任务调度框架,适用于大规模分布式环境
- Spring @Scheduled:Spring 自带的定时任务,适用于简单的定时任务
定时任务广泛应用于订单处理、支付通知、数据统计、日志清理、优惠券过期等业务场景。
2. 技术架构
2.1 XXL-Job 架构
XXL-Job 是一个分布式任务调度平台,具有以下特点:
- 调度中心:负责任务的调度和分发
- 执行器:负责任务的实际执行
- 任务:具体的业务逻辑实现
2.2 组件说明
| 组件 | 说明 | 位置 |
|---|---|---|
| FanyiXxlJobAutoConfiguration | XXL-Job 自动配置类 | fanyi-spring-boot-starter-job |
| XxlJobProperties | XXL-Job 配置属性 | fanyi-spring-boot-starter-job |
| @XxlJob | XXL-Job 任务注解 | xxl-job-core |
| @TenantJob | 多租户任务注解 | fanyi-spring-boot-starter-biz-tenant |
| @Scheduled | Spring 定时任务注解 | Spring Framework |
| BatchedJobService | 批量任务服务 | fanyi-spring-boot-starter-job |
| ContinuousJob | 连续任务接口 | fanyi-spring-boot-starter-job |
3. 配置说明
3.1 XXL-Job 配置
在 application.yaml 中配置 XXL-Job:
yaml
xxl:
job:
# 是否开启,默认为 true
enabled: true
# 访问令牌
access-token: your-access-token
# 调度器配置
admin:
# 调度器地址
addresses: http://127.0.0.1:8080/xxl-job-admin
# 执行器配置
executor:
# 应用名
app-name: fanyi-cloud-executor
# 执行器的 IP
ip:
# 执行器的 Port,-1 表示随机
port: -1
# 日志地址
log-path: /data/applogs/xxl-job/jobhandler
# 日志保留天数,-1 表示永久保留
log-retention-days: 303.2 配置属性说明
| 属性 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
| enabled | Boolean | 否 | true | 是否开启 XXL-Job |
| access-token | String | 否 | - | 访问令牌,用于调度中心和执行器之间的通信认证 |
| admin.addresses | String | 是 | - | 调度器地址,多个地址用逗号分隔 |
| executor.app-name | String | 是 | - | 执行器应用名 |
| executor.ip | String | 否 | - | 执行器 IP,为空则自动获取 |
| executor.port | Integer | 否 | -1 | 执行器端口,-1 表示随机端口 |
| executor.log-path | String | 是 | - | 日志文件存储路径 |
| executor.log-retention-days | Integer | 否 | 30 | 日志保留天数,-1 表示永久保留 |
4. 使用方式
4.1 XXL-Job 任务开发
4.1.1 基础任务
使用 @XxlJob 注解创建 XXL-Job 任务:
java
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
@Component
public class DemoJob {
@XxlJob("demoJob")
public void execute() {
System.out.println("执行定时任务");
}
}4.1.2 带参数的任务
任务可以接收参数:
java
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
@Component
public class TradeOrderAutoCancelJob {
@XxlJob("tradeOrderAutoCancelJob")
public void execute() {
int count = tradeOrderUpdateService.cancelOrderBySystem(100);
System.out.println("过期订单 " + count + " 个");
}
}4.1.3 多租户任务
使用 @TenantJob 注解支持多租户:
java
import com.fanyi.cloud.framework.tenant.core.job.TenantJob;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
@Component
public class TradeOrderAutoReceiveJob {
@XxlJob("tradeOrderAutoReceiveJob")
@TenantJob // 多租户
public String execute() {
int count = tradeOrderUpdateService.receiveOrderBySystem();
return String.format("自动收货 %s 个", count);
}
}4.1.4 带返回值的任务
任务可以返回执行结果:
java
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
@Component
public class CouponExpireJob {
@XxlJob("couponExpireJob")
@TenantJob
public String execute() {
int count = couponService.expireCoupon();
return "过期优惠券 " + count + " 个";
}
}4.1.5 使用 XxlJobHelper
使用 XxlJobHelper 处理任务执行结果:
java
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
@Component
public class AccessLogCleanJob {
@XxlJob("accessLogCleanJob")
public void execute() {
try {
Integer count = apiAccessLogService.cleanAccessLog(14, 100);
log.info("[execute][定时执行清理访问日志数量 ({}) 个]", count);
XxlJobHelper.handleSuccess();
} catch (Exception ex) {
log.error(null, ex);
XxlJobHelper.handleFail();
}
}
}4.2 Spring @Scheduled 任务开发
4.2.1 基础定时任务
使用 @Scheduled 注解创建 Spring 定时任务:
java
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
public class RedisPendingMessageResendJob {
@Scheduled(cron = "35 * * * * ?")
public void messageResend() {
log.info("[messageResend][执行消息重新投递]");
}
}4.2.2 Cron 表达式
Cron 表达式格式:秒 分 时 日 月 周
| 字段 | 允许值 | 允许的特殊字符 |
|---|---|---|
| 秒 | 0-59 | , - * / |
| 分 | 0-59 | , - * / |
| 时 | 0-23 | , - * / |
| 日 | 1-31 | , - * / ? L W |
| 月 | 1-12 | , - * / |
| 周 | 1-7 | , - * / ? L # |
常用 Cron 表达式示例:
| 表达式 | 说明 |
|---|---|
0 0 12 * * ? | 每天 12:00 执行 |
0 15 10 ? * * | 每天 10:15 执行 |
0 15 10 * * ? | 每天 10:15 执行 |
0 15 10 * * ? 2024 | 2024 年每天 10:15 执行 |
0 * 14 * * ? | 每天 14:00-14:59 每分钟执行 |
0 0/5 14 * * ? | 每天 14:00-14:55 每 5 分钟执行 |
0 0/5 14,18 * * ? | 每天 14:00-14:55 和 18:00-18:55 每 5 分钟执行 |
0 0-5 14 * * ? | 每天 14:00-14:05 每分钟执行 |
0 10,44 14 ? 3 WED | 3 月每周三 14:10 和 14:44 执行 |
0 15 10 ? * MON-FRI | 周一到周五 10:15 执行 |
0 15 10 15 * ? | 每月 15 日 10:15 执行 |
0 15 10 L * ? | 每月最后一天 10:15 执行 |
0 15 10 ? * 6L | 每月最后一个周五 10:15 执行 |
0 15 10 ? * 6#3 | 每月第三个周五 10:15 执行 |
35 * * * * ? | 每分钟的第 35 秒执行 |
4.3 批量任务处理
4.3.1 使用 BatchedJobService
BatchedJobService 提供了批量任务处理功能:
java
import com.fanyi.cloud.framework.quartz.service.BatchedJobService;
import com.fanyi.cloud.framework.quartz.service.ContinuousJob;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;
@Component
public class ProductStatisticsJob {
@Resource
private BatchedJobService batchedJobService;
@XxlJob("productStatisticsJob")
public void execute() {
batchedJobService.runSimple("productStatistics", 1, 100, new ContinuousJob<Long>() {
@Override
public Long next(JobContext context, Long lastKey, int sizePerBatch) throws Exception {
List<ProductDO> products = productService.selectList(lastKey, sizePerBatch);
if (products.isEmpty()) {
return null;
}
statisticsService.statistics(products);
return products.get(products.size() - 1).getId();
}
});
}
}4.3.2 参数说明
| 参数 | 类型 | 说明 |
|---|---|---|
| name | String | 任务名称 |
| batchQuantity | Integer | 批次数量 |
| sizePerBatch | Integer | 每批次大小 |
| executor | ContinuousJob | 连续任务执行器 |
4.4 自定义定时任务服务
4.4.1 任务接口定义
java
public interface ScheduledTaskJob {
void run();
}4.4.2 任务实现
java
public class ScheduledTask implements ScheduledTaskJob {
private final Long planId;
private final String creator;
public ScheduledTask(Long planId, String creator) {
this.planId = planId;
this.creator = creator;
}
@Override
public void run() {
String lockKey = "scheduled_task_lock:" + planId;
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock()) {
execute();
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
private void execute() {
}
}4.4.3 任务服务接口
java
public interface ScheduledTaskService {
List<ScheduledTaskBean> taskList();
Boolean start(String taskKey);
Boolean stop(String taskKey);
Boolean restart(String taskKey);
void initAllTask(List<ScheduledTaskBean> scheduledTaskBeanList);
void addTaskToMap(MaintainPlanDO maintainPlanDO);
}4.4.4 任务服务实现
java
@Service
public class ScheduledTaskServiceImpl implements ScheduledTaskService {
private ReentrantLock lock = new ReentrantLock();
@Resource
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
private Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();
@Override
public Boolean start(String taskKey) {
lock.lock();
try {
ScheduledTaskBean scheduledTask = genTask(taskMapper.selectById(taskKey));
this.doStartTask(scheduledTask);
} finally {
lock.unlock();
}
return true;
}
@Override
public Boolean stop(String taskKey) {
boolean taskStartFlag = scheduledFutureMap.containsKey(taskKey);
if (taskStartFlag) {
ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(taskKey);
scheduledFuture.cancel(true);
scheduledFutureMap.remove(taskKey);
}
return taskStartFlag;
}
@Override
public Boolean restart(String taskKey) {
this.stop(taskKey);
return this.start(taskKey);
}
@Override
public void initAllTask(List<ScheduledTaskBean> scheduledTaskBeanList) {
for (ScheduledTaskBean scheduledTask : scheduledTaskBeanList) {
String taskKey = scheduledTask.getTaskKey();
if (this.isStart(taskKey)) {
continue;
}
this.doStartTask(scheduledTask);
}
}
private void doStartTask(ScheduledTaskBean scheduledTask) {
String taskKey = scheduledTask.getTaskKey();
String taskCron = scheduledTask.getTaskCron();
ScheduledTaskJob scheduledTaskJob = new ScheduledTask(scheduledTask.getPlanId(), scheduledTask.getCreator());
ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledTaskJob, new CronTrigger(taskCron));
scheduledFutureMap.put(taskKey, scheduledFuture);
}
}4.4.5 应用启动时初始化任务
java
@Component
@Order(value = 1)
public class ScheduledTaskRunner implements ApplicationRunner {
@Resource
private ScheduledTaskService scheduledTaskService;
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
List<MaintainPlanDO> maintainPlanDOS = taskMapper.selectList(
new LambdaQueryWrapperX<MaintainPlanDO>().eq(MaintainPlanDO::getIsEnabled, true)
);
List<ScheduledTaskBean> scheduledTaskBeanList = new ArrayList<>();
maintainPlanDOS.forEach(maintainPlanDO -> {
ScheduledTaskBean scheduledTaskBean = genTask(maintainPlanDO);
scheduledTaskBeanList.add(scheduledTaskBean);
});
scheduledTaskService.initAllTask(scheduledTaskBeanList);
}
}5. 业务场景示例
5.1 支付订单过期任务
java
@Component
@Slf4j
public class PayOrderExpireJob {
@Resource
private PayOrderService orderService;
@XxlJob("payOrderExpireJob")
public void execute(String param) {
int count = orderService.expireOrder();
log.info("[execute][支付过期 ({}) 个]", count);
}
}5.2 支付通知任务
java
@Component
@Slf4j
@ManagedResource
public class PayNotifyJob {
@Resource
private PayNotifyService payNotifyService;
@XxlJob("payNotifyJob")
public void execute() throws Exception {
int notifyCount = payNotifyService.executeNotify();
log.info("[execute][执行支付通知 ({}) 个]", notifyCount);
}
@ManagedOperation
public boolean opExecute() {
try {
execute();
return true;
} catch (Exception ex) {
log.error(null, ex);
return false;
}
}
}5.3 交易订单自动取消任务
java
@Component
public class TradeOrderAutoCancelJob {
@Resource
private TradeOrderUpdateService tradeOrderUpdateService;
@XxlJob("tradeOrderAutoCancelJob")
public void execute() {
int size = 0;
try {
String jobParam = StringUtils.stripToNull(XxlJobHelper.getJobParam());
if (jobParam != null) {
JsonNode root = new ObjectMapper().readTree(jobParam);
size = root.get("size").asInt();
}
if (size > 0) {
int count = tradeOrderUpdateService.cancelOrderBySystem(size);
XxlJobHelper.handleSuccess(String.format("过期订单 %s 个", count));
}
} catch (JsonProcessingException | IllegalArgumentException ex) {
XxlJobHelper.handleFail("参数错误");
}
}
}5.4 交易订单自动收货任务
java
@Component
public class TradeOrderAutoReceiveJob {
@Resource
private TradeOrderUpdateService tradeOrderUpdateService;
@XxlJob("tradeOrderAutoReceiveJob")
@TenantJob
public String execute() {
int count = tradeOrderUpdateService.receiveOrderBySystem();
return String.format("自动收货 %s 个", count);
}
}5.5 优惠券过期任务
java
@Component
public class CouponExpireJob {
@Resource
private CouponService couponService;
@XxlJob("couponExpireJob")
@TenantJob
public String execute() {
int count = couponService.expireCoupon();
return "过期优惠券 " + count + " 个";
}
}5.6 访问日志清理任务
java
@ManagedResource
@Component
@Slf4j
public class AccessLogCleanJob {
@Resource
private ApiAccessLogService apiAccessLogService;
private static final Integer JOB_CLEAN_RETAIN_DAY = 14;
private static final Integer DELETE_LIMIT = 100;
@XxlJob("accessLogCleanJob")
@TenantIgnore
public void execute() {
if (op()) {
XxlJobHelper.handleSuccess();
} else {
XxlJobHelper.handleFail();
}
}
@ManagedOperation
public boolean op() {
try {
Integer count = apiAccessLogService.cleanAccessLog(JOB_CLEAN_RETAIN_DAY, DELETE_LIMIT);
log.info("[execute][定时执行清理访问日志数量 ({}) 个]", count);
return true;
} catch (Exception ex) {
log.error(null, ex);
return false;
}
}
}5.7 交易统计任务
java
@Component
public class TradeStatisticsJob {
@Resource
private TradeStatisticsService tradeStatisticsService;
@XxlJob("tradeStatisticsJob")
@TenantJob
public String execute(String param) {
param = ObjUtil.defaultIfBlank(param, "1");
if (!NumberUtil.isInteger(param)) {
throw new RuntimeException("交易统计任务的参数只能为是正整数");
}
Integer days = Convert.toInt(param, 0);
if (days < 1) {
throw new RuntimeException("交易统计任务的参数只能为是正整数");
}
String result = tradeStatisticsService.statisticsTrade(days);
return "交易统计:\n" + result;
}
}5.8 Redis 消息重新投递任务
java
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {
private static final String LOCK_KEY = "redis:pending:msg:lock";
private static final int EXPIRE_TIME = 5 * 60;
private final List<AbstractRedisStreamMessageListener<?>> listeners;
private final RedisMQTemplate redisTemplate;
private final String groupName;
private final RedissonClient redissonClient;
@Scheduled(cron = "35 * * * * ?")
public void messageResend() {
RLock lock = redissonClient.getLock(LOCK_KEY);
if (lock.tryLock()) {
try {
execute();
} catch (Exception ex) {
log.error("[messageResend][执行异常]", ex);
} finally {
lock.unlock();
}
}
}
private void execute() {
StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
listeners.forEach(listener -> {
PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
if (pendingMessages.isEmpty()) {
return;
}
pendingMessages.forEach(pendingMessage -> {
long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
if (lastDelivery < EXPIRE_TIME) {
return;
}
List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
if (CollUtil.isEmpty(records)) {
return;
}
redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
.ofObject(records.get(0).getValue())
.withStreamKey(listener.getStreamKey()));
redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
});
});
});
}
}6. 最佳实践
6.1 任务命名规范
- 任务名称使用小写字母和下划线
- 任务名称应该能够清晰描述任务的功能
- 示例:
payOrderExpireJob、tradeOrderAutoCancelJob、accessLogCleanJob
6.2 错误处理
java
@XxlJob("demoJob")
public void execute() {
try {
} catch (Exception ex) {
log.error("[execute][执行异常]", ex);
XxlJobHelper.handleFail(ex.getMessage());
}
}6.3 分布式锁
在分布式环境中使用分布式锁防止重复执行:
java
@XxlJob("demoJob")
public void execute() {
String lockKey = "demo_job_lock";
RLock lock = redissonClient.getLock(lockKey);
try {
if (lock.tryLock()) {
}
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}6.4 任务幂等性
确保任务可以安全地重复执行:
java
@XxlJob("demoJob")
public void execute() {
List<Long> processedIds = new ArrayList<>();
try {
List<OrderDO> orders = orderService.selectPendingOrders();
for (OrderDO order : orders) {
if (order.getStatus() == OrderStatus.PENDING) {
orderService.processOrder(order.getId());
processedIds.add(order.getId());
}
}
XxlJobHelper.handleSuccess("处理订单数量: " + processedIds.size());
} catch (Exception ex) {
log.error("[execute][处理异常]", ex);
XxlJobHelper.handleFail(ex.getMessage());
}
}6.5 日志记录
记录任务的执行情况:
java
@XxlJob("demoJob")
public void execute() {
log.info("[execute][开始执行任务]");
long startTime = System.currentTimeMillis();
try {
int count = orderService.processOrders();
long endTime = System.currentTimeMillis();
log.info("[execute][任务执行完成,处理数量: {},耗时: {}ms]", count, endTime - startTime);
XxlJobHelper.handleSuccess("处理订单数量: " + count);
} catch (Exception ex) {
long endTime = System.currentTimeMillis();
log.error("[execute][任务执行失败,耗时: {}ms]", endTime - startTime, ex);
XxlJobHelper.handleFail(ex.getMessage());
}
}6.6 参数校验
对任务参数进行校验:
java
@XxlJob("demoJob")
public void execute() {
try {
String jobParam = StringUtils.stripToNull(XxlJobHelper.getJobParam());
if (jobParam != null) {
JsonNode root = new ObjectMapper().readTree(jobParam);
int size = root.get("size").asInt();
if (size <= 0) {
throw new IllegalArgumentException("size 必须大于 0");
}
}
} catch (JsonProcessingException | IllegalArgumentException ex) {
XxlJobHelper.handleFail("参数错误: " + ex.getMessage());
}
}6.7 批量处理
对于大数据量的处理,使用批量处理:
java
@XxlJob("demoJob")
public void execute() {
int batchSize = 100;
int offset = 0;
int totalCount = 0;
while (true) {
List<OrderDO> orders = orderService.selectList(offset, batchSize);
if (orders.isEmpty()) {
break;
}
orderService.processOrders(orders);
totalCount += orders.size();
offset += batchSize;
}
log.info("[execute][处理完成,总数: {}]", totalCount);
}6.8 资源释放
确保资源被正确释放:
java
@XxlJob("demoJob")
public void execute() {
Connection connection = null;
try {
connection = dataSource.getConnection();
} catch (Exception ex) {
log.error("[execute][执行异常]", ex);
XxlJobHelper.handleFail(ex.getMessage());
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException ex) {
log.error("[execute][关闭连接异常]", ex);
}
}
}
}7. 常见问题
7.1 任务不执行
问题:任务配置正确但不执行
解决方案:
- 检查 XXL-Job 调度中心是否正常运行
- 检查执行器是否成功注册到调度中心
- 检查任务状态是否为"运行中"
- 检查 Cron 表达式是否正确
7.2 任务重复执行
问题:任务被重复执行
解决方案:
- 使用分布式锁防止重复执行
- 检查是否有多个执行器实例
- 确保任务实现幂等性
7.3 任务执行超时
问题:任务执行时间过长导致超时
解决方案:
- 优化任务逻辑,减少执行时间
- 使用批量处理减少单次处理的数据量
- 增加任务超时时间配置
7.4 多租户任务执行问题
问题:多租户任务执行时租户上下文丢失
解决方案:
- 使用
@TenantJob注解 - 确保租户 ID 正确传递
- 检查数据源切换是否正常
7.5 任务失败后不重试
问题:任务失败后没有自动重试
解决方案:
- 在 XXL-Job 调度中心配置重试次数
- 实现任务失败后的重试逻辑
- 使用
XxlJobHelper.handleFail()标记任务失败
8. 监控和告警
8.1 任务监控
通过 XXL-Job 调度中心监控任务执行情况:
- 任务执行次数
- 任务执行成功率
- 任务执行时间
- 任务失败原因
8.2 日志监控
通过日志监控任务执行情况:
java
@XxlJob("demoJob")
public void execute() {
log.info("[execute][开始执行任务]");
try {
int count = orderService.processOrders();
log.info("[execute][任务执行完成,处理数量: {}]", count);
XxlJobHelper.handleSuccess("处理订单数量: " + count);
} catch (Exception ex) {
log.error("[execute][任务执行失败]", ex);
XxlJobHelper.handleFail(ex.getMessage());
}
}8.3 告警配置
配置任务失败告警:
- 在 XXL-Job 调度中心配置邮件告警
- 配置企业微信告警
- 配置钉钉告警
9. 注意事项
- 任务幂等性:确保任务可以安全地重复执行
- 分布式锁:在分布式环境中使用分布式锁防止重复执行
- 错误处理:正确处理任务执行过程中的异常
- 日志记录:记录任务的执行情况,便于问题排查
- 资源释放:确保任务执行过程中使用的资源被正确释放
- 参数校验:对任务参数进行校验,防止非法参数导致任务失败
- 批量处理:对于大数据量的处理,使用批量处理提高性能
- 多租户支持:使用
@TenantJob注解支持多租户 - Cron 表达式:确保 Cron 表达式正确
- 任务命名:使用清晰的命名规范,便于维护
