专业的JAVA编程教程与资源

网站首页 > java教程 正文

JAVA定时器的嵌套实现(java的定时器能不能提供实时保证 可能提前也可能推迟)

temp10 2024-09-09 08:40:07 java教程 9 ℃ 0 评论


最近工作中遇到个问题,业务需求是要接入供应商类似漏洞扫描的设备,获取他们扫描的结果入库。由于扫描是一个漫长的过程所以并不能即时调接口拿到结果,根据供应商的提供的文档发现,提供了任务创建接口,任务进度查询接口,任务结果获取接口,最后觉得使用@Scheduled定时任务,创建任务将任务数据入库,每隔10分钟查询数据库未完成任务,调一次进度接口,进度为100时再去掉结果接口,最后结果入库,更改数据库任务状态为完成。伪代码大致如下:

JAVA定时器的嵌套实现(java的定时器能不能提供实时保证 可能提前也可能推迟)

1.创建任务

public void createTask() {
        // 1. http 调用创建接口,获取任务编号
        // 2. 任务编号,扫描中状态,入库操作
    }

2.定时任务编写

@Scheduled(cron = "0/10 * * * * *")
    public void timeGetTaskRes() {
        // 1. 查询数据库 任务状态是扫描中的数据
        // 2. http调用任务进度接口,获取进度
        // 3. 判断进度是否是100,如果是100,调用结果接口获取结果操作入库,更改数据库任务状态
        int process = 100;
        if (process == 100) {
            // 1.是100,调用结果接口获取结果操作入库
           // 2.更改数据库任务状态
        }
    }

有的同学急了,这不就是个简单的定时任务吗?哪来的定时器嵌套?莫慌,需求这时又来了,说除了要获取任务结果的同时,还要获取这个任务的报告。查看供应商文档得知,这个报告也是同样的一套,创建,进度,获取,并且还必须等扫描任务结束才能创建,又要定时获取进度,到100才能获取模板。业务流程下图:


定时任务优化,下面给出全量代码(模拟业务):

1.定义线程池

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

/**
 *  定时任务的嵌套demo
 */
@Component
public class ScheduleTaskDemoTestConfig {
    /**
     * 定时调度线程池
     * (ScheduledThreadPoolExecutor): 可以周期性地或延迟地执行任务
     */
    @Bean("scheduledExecutorService")
    public ScheduledExecutorService getScheduledExecutorService() {
        return new ScheduledThreadPoolExecutor(5,
                new BasicThreadFactory.Builder().namingPattern("getTemplate-schedulePool-%d").daemon(true).build());
    }

    /**
     * 固定大小线程池(FixedThreadPool)
     * 线程池的大小是固定的,超出核心线程数的任务将被放在无界队列中等待执行。
     */
    @Bean("fixedExecutorService")
    public ExecutorService getFixedThreadPool() {
        // 创建一个包含5个线程的定长线程池
       return new ThreadPoolExecutor(
                // 核心线程数,线程池一直维持的线程数
                5,
                // 最大线程数,线程池允许的最大线程数,这里设置为与核心线程数相同,表示不增加额外线程
                5,
                // 空闲线程存活时间,设置为0表示线程不会被回收
                0L,
                // 时间单位
                TimeUnit.MILLISECONDS,
                // 工作队列,这里是无界队列,容量为Integer.MAX_VALUE,可修改为有界队列控制任务数量
                new LinkedBlockingQueue<>(100),
                // 拒绝策略,当线程池和工作队列已满时,新提交的任务将由调用者线程执行
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

2.定时任务优化编写


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.concurrent.*;

/**
 *  定时任务的嵌套demo
 */
@Component
@Slf4j
public class ScheduleTaskDemoTest {

    /**
     * 注入定时线程池
     */
    @Autowired  // 使用@Qualifier指定了要注入的Bean的名称
    @Qualifier("scheduledExecutorService")
    private ScheduledExecutorService scheduledExecutorService;

    /**
     * 注入定长线程池
     */
    @Autowired
    @Qualifier("fixedExecutorService")
    private ExecutorService fixedExecutorService;

    private  ScheduledFuture<?> scheduledFuture;


    public static void main(String[] args) {
        log.info("调用创建接口 ===> 创建扫描任务接口 ");
    }

    // 定时任务 60s 执行一次
    @Scheduled(cron = "0/60 * * * * ? ")
    public void timeGetTaskRes() {
        log.info("调用扫描进度接口 ===> 获取到扫描进度 scanProcess ");
        // 此处模拟进度100
        int scanProcess = 100;
        if (scanProcess == 100) {
            // 扫描进度是100,调用结果接口获取结果操作入库
            log.info("调用扫描结果接口 ===> 获取到扫描结果 ");
            // 生成模板方法
            handleTemplate();
            log.info("已经获取报告,回到主线程 <====");
        }
    }

    public void handleTemplate() {
        log.info("扫描进度100,调用模板报告创建接口 ===> 创建模板报告接口 ");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
            // 2.http 获取模板任务进度
            log.info("调用模板报告进度接口 ===> 获取到扫描进度 templateProcess  ");
            // 3.如果进度是100, 获取报告。 同时关闭定时器;
            // 此处模拟进度100
            int templateProcess = 100;
            if (templateProcess == 100) {
                try {
                    // 扫描进度是100,调用结果接口获取结果操作入库
                    log.info("模板报告度100,调用下载模板报告接口 ===> 获取到下载模板报告 ");
                } catch (Exception e) {
                    log.error("调用下载模板报告接口 ****  下载模板报告出错! ");
                    throw new RuntimeException(e);
                } finally {
                    log.info("已调用下载获取模板接口,关闭当前定时器");
                    countDownLatch.countDown();
                    scheduledFuture.cancel(true);
                }
            }
            // 创建延迟3秒后开始执行,每隔15s在执行
        }, 3, 15, TimeUnit.SECONDS);

        fixedExecutorService.submit(()->{
            try {
                Thread.sleep(10 * 1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                if(!scheduledFuture.isCancelled()) {
                    log.info("调用下载接口超时!关闭当前定时器");
                    countDownLatch.countDown();
                    scheduledFuture.cancel(true);
                }
            }
        });

        try {
            countDownLatch.await();
            log.info("报告获取阻塞等待退出");
        } catch (InterruptedException e) {
            log.info("报告获取阻塞等待停止 出错");
            throw new RuntimeException(e);
        }
    }
}

注意点:

1.上述代码使用了CountDownLatch,由于后续代码需要用到下载的模板报告,因此此处阻塞等待模板下载完毕主线程再继续向下执行;

2.另外又单开了一个普通线程用来执行超时操作,若某个任务模板一直扫描进度很久都未完成,规定一个超时时间,到达超时时间直接抛弃这个模板下载任务;

3.scheduledFuture需要提前声明变量,不然在提交的定时任务内不能使用这个变量;

4.scheduledFuture.cancel(true)使用了这个api,没有使用shutdown()是由于后面还有其他任务会提交到定时任务线程池,shutdown就直接关闭了,无法再提交新的定时任务,因此使用了cancel;

5.再超时线程中使用了isCancled判断是否已经取消,是因为任务还未超时就已经下载好,避免重复去取消当前定时器产生冲突;

6.启动类保证要有@EnableScheduling注解,否则spring定时不会执行;


最后说明,上述定时器间隔,线程池大小,以及超时时间,大家可以根据自己的实际业务进行适当调整,希望能给大家带来启发,有问题也可以在评论区讨论,看到都会回复,感谢观看。

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表