Skip to content

定时任务

1. 概述

定时任务是梵医云系统中用于执行周期性任务的核心模块。系统提供了两种定时任务实现方式:

  1. XXL-Job:分布式任务调度框架,适用于大规模分布式环境
  2. Spring @Scheduled:Spring 自带的定时任务,适用于简单的定时任务

定时任务广泛应用于订单处理、支付通知、数据统计、日志清理、优惠券过期等业务场景。

2. 技术架构

2.1 XXL-Job 架构

XXL-Job 是一个分布式任务调度平台,具有以下特点:

  • 调度中心:负责任务的调度和分发
  • 执行器:负责任务的实际执行
  • 任务:具体的业务逻辑实现

2.2 组件说明

组件说明位置
FanyiXxlJobAutoConfigurationXXL-Job 自动配置类fanyi-spring-boot-starter-job
XxlJobPropertiesXXL-Job 配置属性fanyi-spring-boot-starter-job
@XxlJobXXL-Job 任务注解xxl-job-core
@TenantJob多租户任务注解fanyi-spring-boot-starter-biz-tenant
@ScheduledSpring 定时任务注解Spring Framework
BatchedJobService批量任务服务fanyi-spring-boot-starter-job
ContinuousJob连续任务接口fanyi-spring-boot-starter-job

3. 配置说明

3.1 XXL-Job 配置

application.yaml 中配置 XXL-Job:

yaml
xxl:
  job:
    # 是否开启,默认为 true
    enabled: true
    # 访问令牌
    access-token: your-access-token
    # 调度器配置
    admin:
      # 调度器地址
      addresses: http://127.0.0.1:8080/xxl-job-admin
    # 执行器配置
    executor:
      # 应用名
      app-name: fanyi-cloud-executor
      # 执行器的 IP
      ip: 
      # 执行器的 Port,-1 表示随机
      port: -1
      # 日志地址
      log-path: /data/applogs/xxl-job/jobhandler
      # 日志保留天数,-1 表示永久保留
      log-retention-days: 30

3.2 配置属性说明

属性类型必填默认值说明
enabledBooleantrue是否开启 XXL-Job
access-tokenString-访问令牌,用于调度中心和执行器之间的通信认证
admin.addressesString-调度器地址,多个地址用逗号分隔
executor.app-nameString-执行器应用名
executor.ipString-执行器 IP,为空则自动获取
executor.portInteger-1执行器端口,-1 表示随机端口
executor.log-pathString-日志文件存储路径
executor.log-retention-daysInteger30日志保留天数,-1 表示永久保留

4. 使用方式

4.1 XXL-Job 任务开发

4.1.1 基础任务

使用 @XxlJob 注解创建 XXL-Job 任务:

java
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

@Component
public class DemoJob {

    @XxlJob("demoJob")
    public void execute() {
        System.out.println("执行定时任务");
    }
}

4.1.2 带参数的任务

任务可以接收参数:

java
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

@Component
public class TradeOrderAutoCancelJob {

    @XxlJob("tradeOrderAutoCancelJob")
    public void execute() {
        int count = tradeOrderUpdateService.cancelOrderBySystem(100);
        System.out.println("过期订单 " + count + " 个");
    }
}

4.1.3 多租户任务

使用 @TenantJob 注解支持多租户:

java
import com.fanyi.cloud.framework.tenant.core.job.TenantJob;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

@Component
public class TradeOrderAutoReceiveJob {

    @XxlJob("tradeOrderAutoReceiveJob")
    @TenantJob // 多租户
    public String execute() {
        int count = tradeOrderUpdateService.receiveOrderBySystem();
        return String.format("自动收货 %s 个", count);
    }
}

4.1.4 带返回值的任务

任务可以返回执行结果:

java
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

@Component
public class CouponExpireJob {

    @XxlJob("couponExpireJob")
    @TenantJob
    public String execute() {
        int count = couponService.expireCoupon();
        return "过期优惠券 " + count + " 个";
    }
}

4.1.5 使用 XxlJobHelper

使用 XxlJobHelper 处理任务执行结果:

java
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

@Component
public class AccessLogCleanJob {

    @XxlJob("accessLogCleanJob")
    public void execute() {
        try {
            Integer count = apiAccessLogService.cleanAccessLog(14, 100);
            log.info("[execute][定时执行清理访问日志数量 ({}) 个]", count);
            XxlJobHelper.handleSuccess();
        } catch (Exception ex) {
            log.error(null, ex);
            XxlJobHelper.handleFail();
        }
    }
}

4.2 Spring @Scheduled 任务开发

4.2.1 基础定时任务

使用 @Scheduled 注解创建 Spring 定时任务:

java
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class RedisPendingMessageResendJob {

    @Scheduled(cron = "35 * * * * ?")
    public void messageResend() {
        log.info("[messageResend][执行消息重新投递]");
    }
}

4.2.2 Cron 表达式

Cron 表达式格式:秒 分 时 日 月 周

字段允许值允许的特殊字符
0-59, - * /
0-59, - * /
0-23, - * /
1-31, - * / ? L W
1-12, - * /
1-7, - * / ? L #

常用 Cron 表达式示例:

表达式说明
0 0 12 * * ?每天 12:00 执行
0 15 10 ? * *每天 10:15 执行
0 15 10 * * ?每天 10:15 执行
0 15 10 * * ? 20242024 年每天 10:15 执行
0 * 14 * * ?每天 14:00-14:59 每分钟执行
0 0/5 14 * * ?每天 14:00-14:55 每 5 分钟执行
0 0/5 14,18 * * ?每天 14:00-14:55 和 18:00-18:55 每 5 分钟执行
0 0-5 14 * * ?每天 14:00-14:05 每分钟执行
0 10,44 14 ? 3 WED3 月每周三 14:10 和 14:44 执行
0 15 10 ? * MON-FRI周一到周五 10:15 执行
0 15 10 15 * ?每月 15 日 10:15 执行
0 15 10 L * ?每月最后一天 10:15 执行
0 15 10 ? * 6L每月最后一个周五 10:15 执行
0 15 10 ? * 6#3每月第三个周五 10:15 执行
35 * * * * ?每分钟的第 35 秒执行

4.3 批量任务处理

4.3.1 使用 BatchedJobService

BatchedJobService 提供了批量任务处理功能:

java
import com.fanyi.cloud.framework.quartz.service.BatchedJobService;
import com.fanyi.cloud.framework.quartz.service.ContinuousJob;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

@Component
public class ProductStatisticsJob {

    @Resource
    private BatchedJobService batchedJobService;

    @XxlJob("productStatisticsJob")
    public void execute() {
        batchedJobService.runSimple("productStatistics", 1, 100, new ContinuousJob<Long>() {
            @Override
            public Long next(JobContext context, Long lastKey, int sizePerBatch) throws Exception {
                List<ProductDO> products = productService.selectList(lastKey, sizePerBatch);
                if (products.isEmpty()) {
                    return null;
                }
                statisticsService.statistics(products);
                return products.get(products.size() - 1).getId();
            }
        });
    }
}

4.3.2 参数说明

参数类型说明
nameString任务名称
batchQuantityInteger批次数量
sizePerBatchInteger每批次大小
executorContinuousJob连续任务执行器

4.4 自定义定时任务服务

4.4.1 任务接口定义

java
public interface ScheduledTaskJob {
    void run();
}

4.4.2 任务实现

java
public class ScheduledTask implements ScheduledTaskJob {

    private final Long planId;
    private final String creator;

    public ScheduledTask(Long planId, String creator) {
        this.planId = planId;
        this.creator = creator;
    }

    @Override
    public void run() {
        String lockKey = "scheduled_task_lock:" + planId;
        RLock lock = redissonClient.getLock(lockKey);
        try {
            if (lock.tryLock()) {
                execute();
            }
        } finally {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }

    private void execute() {
    }
}

4.4.3 任务服务接口

java
public interface ScheduledTaskService {

    List<ScheduledTaskBean> taskList();

    Boolean start(String taskKey);

    Boolean stop(String taskKey);

    Boolean restart(String taskKey);

    void initAllTask(List<ScheduledTaskBean> scheduledTaskBeanList);

    void addTaskToMap(MaintainPlanDO maintainPlanDO);
}

4.4.4 任务服务实现

java
@Service
public class ScheduledTaskServiceImpl implements ScheduledTaskService {

    private ReentrantLock lock = new ReentrantLock();

    @Resource
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    private Map<String, ScheduledFuture> scheduledFutureMap = new ConcurrentHashMap<>();

    @Override
    public Boolean start(String taskKey) {
        lock.lock();
        try {
            ScheduledTaskBean scheduledTask = genTask(taskMapper.selectById(taskKey));
            this.doStartTask(scheduledTask);
        } finally {
            lock.unlock();
        }
        return true;
    }

    @Override
    public Boolean stop(String taskKey) {
        boolean taskStartFlag = scheduledFutureMap.containsKey(taskKey);
        if (taskStartFlag) {
            ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(taskKey);
            scheduledFuture.cancel(true);
            scheduledFutureMap.remove(taskKey);
        }
        return taskStartFlag;
    }

    @Override
    public Boolean restart(String taskKey) {
        this.stop(taskKey);
        return this.start(taskKey);
    }

    @Override
    public void initAllTask(List<ScheduledTaskBean> scheduledTaskBeanList) {
        for (ScheduledTaskBean scheduledTask : scheduledTaskBeanList) {
            String taskKey = scheduledTask.getTaskKey();
            if (this.isStart(taskKey)) {
                continue;
            }
            this.doStartTask(scheduledTask);
        }
    }

    private void doStartTask(ScheduledTaskBean scheduledTask) {
        String taskKey = scheduledTask.getTaskKey();
        String taskCron = scheduledTask.getTaskCron();
        ScheduledTaskJob scheduledTaskJob = new ScheduledTask(scheduledTask.getPlanId(), scheduledTask.getCreator());
        ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledTaskJob, new CronTrigger(taskCron));
        scheduledFutureMap.put(taskKey, scheduledFuture);
    }
}

4.4.5 应用启动时初始化任务

java
@Component
@Order(value = 1)
public class ScheduledTaskRunner implements ApplicationRunner {

    @Resource
    private ScheduledTaskService scheduledTaskService;

    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        List<MaintainPlanDO> maintainPlanDOS = taskMapper.selectList(
            new LambdaQueryWrapperX<MaintainPlanDO>().eq(MaintainPlanDO::getIsEnabled, true)
        );
        List<ScheduledTaskBean> scheduledTaskBeanList = new ArrayList<>();
        maintainPlanDOS.forEach(maintainPlanDO -> {
            ScheduledTaskBean scheduledTaskBean = genTask(maintainPlanDO);
            scheduledTaskBeanList.add(scheduledTaskBean);
        });
        scheduledTaskService.initAllTask(scheduledTaskBeanList);
    }
}

5. 业务场景示例

5.1 支付订单过期任务

java
@Component
@Slf4j
public class PayOrderExpireJob {

    @Resource
    private PayOrderService orderService;

    @XxlJob("payOrderExpireJob")
    public void execute(String param) {
        int count = orderService.expireOrder();
        log.info("[execute][支付过期 ({}) 个]", count);
    }
}

5.2 支付通知任务

java
@Component
@Slf4j
@ManagedResource
public class PayNotifyJob {

    @Resource
    private PayNotifyService payNotifyService;

    @XxlJob("payNotifyJob")
    public void execute() throws Exception {
        int notifyCount = payNotifyService.executeNotify();
        log.info("[execute][执行支付通知 ({}) 个]", notifyCount);
    }

    @ManagedOperation
    public boolean opExecute() {
        try {
            execute();
            return true;
        } catch (Exception ex) {
            log.error(null, ex);
            return false;
        }
    }
}

5.3 交易订单自动取消任务

java
@Component
public class TradeOrderAutoCancelJob {

    @Resource
    private TradeOrderUpdateService tradeOrderUpdateService;

    @XxlJob("tradeOrderAutoCancelJob")
    public void execute() {
        int size = 0;
        try {
            String jobParam = StringUtils.stripToNull(XxlJobHelper.getJobParam());
            if (jobParam != null) {
                JsonNode root = new ObjectMapper().readTree(jobParam);
                size = root.get("size").asInt();
            }
            if (size > 0) {
                int count = tradeOrderUpdateService.cancelOrderBySystem(size);
                XxlJobHelper.handleSuccess(String.format("过期订单 %s 个", count));
            }
        } catch (JsonProcessingException | IllegalArgumentException ex) {
            XxlJobHelper.handleFail("参数错误");
        }
    }
}

5.4 交易订单自动收货任务

java
@Component
public class TradeOrderAutoReceiveJob {

    @Resource
    private TradeOrderUpdateService tradeOrderUpdateService;

    @XxlJob("tradeOrderAutoReceiveJob")
    @TenantJob
    public String execute() {
        int count = tradeOrderUpdateService.receiveOrderBySystem();
        return String.format("自动收货 %s 个", count);
    }
}

5.5 优惠券过期任务

java
@Component
public class CouponExpireJob {

    @Resource
    private CouponService couponService;

    @XxlJob("couponExpireJob")
    @TenantJob
    public String execute() {
        int count = couponService.expireCoupon();
        return "过期优惠券 " + count + " 个";
    }
}

5.6 访问日志清理任务

java
@ManagedResource
@Component
@Slf4j
public class AccessLogCleanJob {

    @Resource
    private ApiAccessLogService apiAccessLogService;

    private static final Integer JOB_CLEAN_RETAIN_DAY = 14;
    private static final Integer DELETE_LIMIT = 100;

    @XxlJob("accessLogCleanJob")
    @TenantIgnore
    public void execute() {
        if (op()) {
            XxlJobHelper.handleSuccess();
        } else {
            XxlJobHelper.handleFail();
        }
    }

    @ManagedOperation
    public boolean op() {
        try {
            Integer count = apiAccessLogService.cleanAccessLog(JOB_CLEAN_RETAIN_DAY, DELETE_LIMIT);
            log.info("[execute][定时执行清理访问日志数量 ({}) 个]", count);
            return true;
        } catch (Exception ex) {
            log.error(null, ex);
            return false;
        }
    }
}

5.7 交易统计任务

java
@Component
public class TradeStatisticsJob {

    @Resource
    private TradeStatisticsService tradeStatisticsService;

    @XxlJob("tradeStatisticsJob")
    @TenantJob
    public String execute(String param) {
        param = ObjUtil.defaultIfBlank(param, "1");
        if (!NumberUtil.isInteger(param)) {
            throw new RuntimeException("交易统计任务的参数只能为是正整数");
        }
        Integer days = Convert.toInt(param, 0);
        if (days < 1) {
            throw new RuntimeException("交易统计任务的参数只能为是正整数");
        }
        String result = tradeStatisticsService.statisticsTrade(days);
        return "交易统计:\n" + result;
    }
}

5.8 Redis 消息重新投递任务

java
@Slf4j
@AllArgsConstructor
public class RedisPendingMessageResendJob {

    private static final String LOCK_KEY = "redis:pending:msg:lock";
    private static final int EXPIRE_TIME = 5 * 60;

    private final List<AbstractRedisStreamMessageListener<?>> listeners;
    private final RedisMQTemplate redisTemplate;
    private final String groupName;
    private final RedissonClient redissonClient;

    @Scheduled(cron = "35 * * * * ?")
    public void messageResend() {
        RLock lock = redissonClient.getLock(LOCK_KEY);
        if (lock.tryLock()) {
            try {
                execute();
            } catch (Exception ex) {
                log.error("[messageResend][执行异常]", ex);
            } finally {
                lock.unlock();
            }
        }
    }

    private void execute() {
        StreamOperations<String, Object, Object> ops = redisTemplate.getRedisTemplate().opsForStream();
        listeners.forEach(listener -> {
            PendingMessagesSummary pendingMessagesSummary = Objects.requireNonNull(ops.pending(listener.getStreamKey(), groupName));
            Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();
            pendingMessagesPerConsumer.forEach((consumerName, pendingMessageCount) -> {
                PendingMessages pendingMessages = ops.pending(listener.getStreamKey(), Consumer.from(groupName, consumerName), Range.unbounded(), pendingMessageCount);
                if (pendingMessages.isEmpty()) {
                    return;
                }
                pendingMessages.forEach(pendingMessage -> {
                    long lastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds();
                    if (lastDelivery < EXPIRE_TIME) {
                        return;
                    }
                    List<MapRecord<String, Object, Object>> records = ops.range(listener.getStreamKey(),
                            Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
                    if (CollUtil.isEmpty(records)) {
                        return;
                    }
                    redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord()
                            .ofObject(records.get(0).getValue())
                            .withStreamKey(listener.getStreamKey()));
                    redisTemplate.getRedisTemplate().opsForStream().acknowledge(groupName, records.get(0));
                    log.info("[processPendingMessage][消息({})重新投递成功]", records.get(0).getId());
                });
            });
        });
    }
}

6. 最佳实践

6.1 任务命名规范

  • 任务名称使用小写字母和下划线
  • 任务名称应该能够清晰描述任务的功能
  • 示例:payOrderExpireJobtradeOrderAutoCancelJobaccessLogCleanJob

6.2 错误处理

java
@XxlJob("demoJob")
public void execute() {
    try {
    } catch (Exception ex) {
        log.error("[execute][执行异常]", ex);
        XxlJobHelper.handleFail(ex.getMessage());
    }
}

6.3 分布式锁

在分布式环境中使用分布式锁防止重复执行:

java
@XxlJob("demoJob")
public void execute() {
    String lockKey = "demo_job_lock";
    RLock lock = redissonClient.getLock(lockKey);
    try {
        if (lock.tryLock()) {
        }
    } finally {
        if (lock.isHeldByCurrentThread()) {
            lock.unlock();
        }
    }
}

6.4 任务幂等性

确保任务可以安全地重复执行:

java
@XxlJob("demoJob")
public void execute() {
    List<Long> processedIds = new ArrayList<>();
    try {
        List<OrderDO> orders = orderService.selectPendingOrders();
        for (OrderDO order : orders) {
            if (order.getStatus() == OrderStatus.PENDING) {
                orderService.processOrder(order.getId());
                processedIds.add(order.getId());
            }
        }
        XxlJobHelper.handleSuccess("处理订单数量: " + processedIds.size());
    } catch (Exception ex) {
        log.error("[execute][处理异常]", ex);
        XxlJobHelper.handleFail(ex.getMessage());
    }
}

6.5 日志记录

记录任务的执行情况:

java
@XxlJob("demoJob")
public void execute() {
    log.info("[execute][开始执行任务]");
    long startTime = System.currentTimeMillis();
    try {
        int count = orderService.processOrders();
        long endTime = System.currentTimeMillis();
        log.info("[execute][任务执行完成,处理数量: {},耗时: {}ms]", count, endTime - startTime);
        XxlJobHelper.handleSuccess("处理订单数量: " + count);
    } catch (Exception ex) {
        long endTime = System.currentTimeMillis();
        log.error("[execute][任务执行失败,耗时: {}ms]", endTime - startTime, ex);
        XxlJobHelper.handleFail(ex.getMessage());
    }
}

6.6 参数校验

对任务参数进行校验:

java
@XxlJob("demoJob")
public void execute() {
    try {
        String jobParam = StringUtils.stripToNull(XxlJobHelper.getJobParam());
        if (jobParam != null) {
            JsonNode root = new ObjectMapper().readTree(jobParam);
            int size = root.get("size").asInt();
            if (size <= 0) {
                throw new IllegalArgumentException("size 必须大于 0");
            }
        }
    } catch (JsonProcessingException | IllegalArgumentException ex) {
        XxlJobHelper.handleFail("参数错误: " + ex.getMessage());
    }
}

6.7 批量处理

对于大数据量的处理,使用批量处理:

java
@XxlJob("demoJob")
public void execute() {
    int batchSize = 100;
    int offset = 0;
    int totalCount = 0;
    while (true) {
        List<OrderDO> orders = orderService.selectList(offset, batchSize);
        if (orders.isEmpty()) {
            break;
        }
        orderService.processOrders(orders);
        totalCount += orders.size();
        offset += batchSize;
    }
    log.info("[execute][处理完成,总数: {}]", totalCount);
}

6.8 资源释放

确保资源被正确释放:

java
@XxlJob("demoJob")
public void execute() {
    Connection connection = null;
    try {
        connection = dataSource.getConnection();
    } catch (Exception ex) {
        log.error("[execute][执行异常]", ex);
        XxlJobHelper.handleFail(ex.getMessage());
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException ex) {
                log.error("[execute][关闭连接异常]", ex);
            }
        }
    }
}

7. 常见问题

7.1 任务不执行

问题:任务配置正确但不执行

解决方案

  1. 检查 XXL-Job 调度中心是否正常运行
  2. 检查执行器是否成功注册到调度中心
  3. 检查任务状态是否为"运行中"
  4. 检查 Cron 表达式是否正确

7.2 任务重复执行

问题:任务被重复执行

解决方案

  1. 使用分布式锁防止重复执行
  2. 检查是否有多个执行器实例
  3. 确保任务实现幂等性

7.3 任务执行超时

问题:任务执行时间过长导致超时

解决方案

  1. 优化任务逻辑,减少执行时间
  2. 使用批量处理减少单次处理的数据量
  3. 增加任务超时时间配置

7.4 多租户任务执行问题

问题:多租户任务执行时租户上下文丢失

解决方案

  1. 使用 @TenantJob 注解
  2. 确保租户 ID 正确传递
  3. 检查数据源切换是否正常

7.5 任务失败后不重试

问题:任务失败后没有自动重试

解决方案

  1. 在 XXL-Job 调度中心配置重试次数
  2. 实现任务失败后的重试逻辑
  3. 使用 XxlJobHelper.handleFail() 标记任务失败

8. 监控和告警

8.1 任务监控

通过 XXL-Job 调度中心监控任务执行情况:

  • 任务执行次数
  • 任务执行成功率
  • 任务执行时间
  • 任务失败原因

8.2 日志监控

通过日志监控任务执行情况:

java
@XxlJob("demoJob")
public void execute() {
    log.info("[execute][开始执行任务]");
    try {
        int count = orderService.processOrders();
        log.info("[execute][任务执行完成,处理数量: {}]", count);
        XxlJobHelper.handleSuccess("处理订单数量: " + count);
    } catch (Exception ex) {
        log.error("[execute][任务执行失败]", ex);
        XxlJobHelper.handleFail(ex.getMessage());
    }
}

8.3 告警配置

配置任务失败告警:

  1. 在 XXL-Job 调度中心配置邮件告警
  2. 配置企业微信告警
  3. 配置钉钉告警

9. 注意事项

  1. 任务幂等性:确保任务可以安全地重复执行
  2. 分布式锁:在分布式环境中使用分布式锁防止重复执行
  3. 错误处理:正确处理任务执行过程中的异常
  4. 日志记录:记录任务的执行情况,便于问题排查
  5. 资源释放:确保任务执行过程中使用的资源被正确释放
  6. 参数校验:对任务参数进行校验,防止非法参数导致任务失败
  7. 批量处理:对于大数据量的处理,使用批量处理提高性能
  8. 多租户支持:使用 @TenantJob 注解支持多租户
  9. Cron 表达式:确保 Cron 表达式正确
  10. 任务命名:使用清晰的命名规范,便于维护

10. 相关文档