# 1. 定时任务技术选型
# 1.1 xxl-job简介
一个轻量级分布式任务调度框架 ,主要分为调度中心和执行器两部分,调度中心在启动初始化的时候,会默认生成执行器的RPC代理对象, 执行器项目启动之后,调度中心在触发定时器之后通过jobHandle来调用执行器项目里面的代码。
项目地址:https://github.com/xuxueli/xxl-job/ (opens new window)
xxl-job的中文文档非常详细,强烈建议看官方文档操作。xxl-job官方文档 (opens new window)
# 1.2 Scheduled简介
Spring Boot 中的 @Scheduled 注解为定时任务提供了一种很简单的实现,只需要在注解中加上一些属性,例如 fixedRate、fixedDelay、cron等,并且在启动类上面加上 @EnableScheduling 注解,就可以启动一个定时任务了。
官方文档:https://spring.io/guides/gs/scheduling-tasks/ (opens new window)
# 2. xxl-job分布式任务调度平台
# 2.1 整合xxl-job
环境要求:Maven3+、Jdk1.8+、Mysql5.7+
Step1:去github把项目代码clone下来,下载安装Maven依赖,将建表sql导入MySQL。
doc:项目文档(含离线版中英文文档、架构说明ppt、数据库建表sql)
xxl-job-admin:调度中心
xxl-job-core:公共依赖(用Maven引入即可,这个用不到)
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
2
3
4
5
6
注:sql里包含建库语句,如果已经有数据库了,把那句去掉即可。下面是关于数据表的说明:
- xxl_job_lock:任务调度锁表;
- xxl_job_group:执行器信息表,维护任务执行器信息;
- xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
- xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
- xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
- xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
- xxl_job_user:系统用户表;
2
3
4
5
6
7
8
Step2:配置xxl-job-admin项目的application.properties文件(主要是修改数据库连接,再就是建议设置一下accessToken,其他配置参照官方文档即可),然后启动xxl-job-admin项目,Chrome打开http://localhost:port/xxl-job-admin
地址,即可看到调度中心。
初始账号密码:admin/123456 (登录进去可以在系统里增删用户、修改密码)
Step3:配置xxl-job-executor-sample-springboot项目的application.properties文件,主要修改以下两项:
# 调度中心地址(就刚刚的xxl-job-admin启动地址)
xxl.job.admin.addresses = http://localhost:port/xxl-job-admin
# accessToken值(与调度中心的配置一致即可)
xxl.job.accessToken =
2
3
4
注:IP建议不填,执行器启动后,xxl-job-admin会自动检索,自动注册如果找得到IP,就说明执行器启动成功了,算是个验证吧。
另注:执行器可以整合进自己项目里(把SampleXxlJob和XxlJobConfig加进去即可),也可以单独建个项目,个人建议使用后者。因为可能定时任务模块已经开发好了,可以放到服务器里跑数据了,而项目还出于开发状态,还不便于部署。还有一个原因就是即便是使用shell模式,也必须在服务器放一个执行器(哪怕是空的)才能用,没有的话无法成功执行任务。
# 2.2 开发xxl-job的定时任务
下面运行一下官方提供的Demo,演示如何使用xxl-job开发定时任务,具体使用请查阅官方文档。
Step1:以SampleXxlJob.java的demoJobHandler()为例,这里定时任务代码已经写好了,如果要开发定时任务的话,就在这里面写具体业务逻辑(直接调用对应的Service即可)。
@Component
public class SampleXxlJob {
private static Logger logger = LoggerFactory.getLogger(SampleXxlJob.class);
/**
* 简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World."); // XxlJobHelper.log会记入到执行日志里
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
// default success
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
另注:shell模式的话不用在这里开发,加任务的时候选择shell模式,加完之后直接在调度系统里写脚本即可。
Step2:打开调度中心,首先我们要在任务管理里添加任务,参照如下示例填写即可(官方提供的表里已有该数据),这时还不能执行
Step3:然后打开执行器管理,配置执行器,参照如下示例填写即可(官方提供的表里已有该数据)
注:可以选择自动注册(由于我们在执行器里已经配置了调度中心地址),也可以选择手动导入,建议前者。配置成功后OnLine机器地址处便能查看到执行器地址,这个稍微有一点延迟,如果看不到的话说明之前配置的有问题,需要先去排查一下。
Step4:配置好了执行器之后再去任务管理处执行定时任务即可,执行结果可以在调度日志里查看(可以点开查看详细日志)
# 2.3 xxl-job的服务器部署
Step1:修改配置文件,对调度器和执行器分别用Maven插件打个jar包。
注:打包之前先执行mvn install,否则可能打出来的包小的离谱,可能是没有依赖,根本不能用。
另注:如果调度器和执行器部署到一个服务器上的话,调度器地址留localhost就行。
xxl.job.admin.addresses=http://localhost:8080/xxl-job-admin
Step2:将jar包上传到服务器上执行。
$ java -jar -Duser.timezone=GMT+8 xxl-job-admin.jar
$ java -jar -Duser.timezone=GMT+8 xxl-job-executor.jar
2
注:需要保证服务器上用的是jdk8,版本不一致的话执行器运行时会报错,如下图(本地跑的好好的jar放到服务器上就报错,排查了半天发现是服务器用的jdk11,换成jdk8就好了)
Step3:把启动项目的命令写个shell脚本,加开机自启。调度器加反向代理、HTTPS什么的,看你心情,弄不弄都无所谓了。
脚本的话,执行器和调度器要分开写,写一起的话只会执行第一个。还有个注意的点是,执行器启动要晚于调度器,因此我加了延时。
start_xxl_job_admin.sh
cd /myproject/xxl-job-admin
nohup java -jar -Duser.timezone=GMT+8 xxl-job-admin-2.1.6.RELEASE.jar > xxl-job-admin.log 2>&1 &
2
start_xxl_job_executor.sh
cd /myproject/xxl-job-executor
sleep 3m
nohup java -jar -Duser.timezone=GMT+8 executor-0.0.1-SNAPSHOT.jar > xxl-job-executor.log 2>&1 &
2
3
注意事项:
- nohup加在一个命令的最前面,表示不挂断的运行命令。
- -Duser.timezone=GMT+8表示采用东8区进行启动,否则会出现时间异常,调度器里的cron执行时间和日志记录都会受到影响。
- 2>&1的意思是将标准错误(2)也定向到标准输出(1)的输出文件。
- &加在一个命令的最后面,表示这个命令放在后台执行。
写好脚本之后,赋予其可执行权限,然后输入crontab -e
命令,添加以下内容设置开机自启。
@reboot /myshell/start_xxl_job_admin.sh
@reboot /myshell/start_xxl_job_executor.sh
2
# 3. Scheduled定时任务
# 3.1 整合Scheduled定时任务
要使用Scheduled定时任务,首先需要在启动类添加@EnableScheduling
,启用Spring的计划任务执行功能,这样可以在容器中的任何Spring管理的bean上检测@Scheduled
注解,执行计划任务。该注解的定义如下:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
String cron() default "";
String zone() default "";
long fixedDelay() default -1;
String fixedDelayString() default "";
long fixedRate() default -1;
String fixedRateString() default "";
long initialDelay() default -1;
String initialDelayString() default "";
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 3.2 @Scheduled注解详解
# 3.2.1 @Scheduled参数说明
参数 | 参数说明 | 示例 |
---|---|---|
cron | 任务执行的cron表达式 | 0/1 * * * * ? |
zone | cron表达时解析使用的时区,默认为服务器的本地时区,使用java.util.TimeZone#getTimeZone(String)方法解析 | GMT-8:00 |
fixedDelay | 上一次任务执行结束到下一次执行开始的间隔时间,单位为ms | 1000 |
fixedDelayString | 上一次任务执行结束到下一次执行开始的间隔时间,使用java.time.Duration#parse解析 | PT15M |
fixedRate | 以固定间隔执行任务,即上一次任务执行开始到下一次执行开始的间隔时间,单位为ms,若在调度任务执行时,上一次任务还未执行完毕,会加入worker队列,等待上一次执行完成后立即执行下一次任务 | 2000 |
fixedRateString | 与fixedRate逻辑一致,只是使用java.time.Duration#parse解析 | PT15M |
initialDelay | 首次任务执行的延迟时间 | 1000 |
initialDelayString | 首次任务执行的延迟时间,使用java.time.Duration#parse解析 | PT15M |
# 3.2.2 Crontab 定时任务语法
由于@Scheduled可以执行cron表达式,因此这里也简要说下Crontab定时任务的语法。
# For details see man 4 crontabs
# Example of job definition:
# .---------------- minute (0 - 59)
# | .------------- hour (0 - 23)
# | | .---------- day of month (1 - 31)
# | | | .------- month (1 - 12) OR jan,feb,mar,apr ...
# | | | | .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat
# | | | | |
# * * * * * user-name command to be executed
定时任务的每段为:分,时,日,月,周,用户,命令
第1列表示分钟1~59 每分钟用*或者 */1表示
第2列表示小时1~23(0表示0点)
第3列表示日期1~31
第4列表示月份1~12
第5列标识号星期0~6(0表示星期天)
第6列要运行的命令
*:表示任意时间都,实际上就是“每”的意思。可以代表00-23小时或者00-12每月或者00-59分
-:表示区间,是一个范围,00 17-19 * * * cmd,就是每天17,18,19点的整点执行命令
,:是分割时段,30 3,19,21 * * * cmd,就是每天凌晨3和晚上19,21点的半点时刻执行命令
/n:表示分割,可以看成除法,*/5 * * * * cmd,每隔五分钟执行一次
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 3.3 用配置项设置定时任务是否开启
在 application.properties 文件里添加配置项
# periodical-task settings
settings.task.enabled=false
2
在定时任务的文件的类上添加如下两个注解
@EnableScheduling
// 配置文件读取是否开启定时任务(与havingValue值相同才会开启)
@ConditionalOnProperty(prefix = "settings.task", name = "enabled", havingValue = "true")
2
3
# 3.4 Scheduled使用多线程的封装示例
定时任务通常是一些非常耗时的后台任务,有时需要使用多线程去提高执行性能。
以下封装示例基于CompletableFuture,见5.2节。
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* 定时任务示例
*/
@Slf4j
@Component
@Async
@EnableScheduling
@ConditionalOnProperty(prefix = "settings.task", name = "enabled", havingValue = "true") // 配置文件读取是否开启定时任务(与havingValue值相同才会开启)
public class TestTask {
/**
* 单线程测试定时任务
*/
@Scheduled(cron = "* * * * * ?")
public void testTask(){
log.info("{}, 测试定时任务执行时间为{}", Thread.currentThread().getName(), new Date());
}
/**
* 多线程测试定时任务
*/
@Scheduled(fixedDelay = 99999999)
public void testMultithreadTask() {
// 开始时间
Long start = System.currentTimeMillis();
// 任务提交顺序
List<String> list = new ArrayList<>();
// 定长20线程池
ExecutorService exs = Executors.newFixedThreadPool(20);
// 待执行的任务列表
final List<Integer> taskList = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
try {
// 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
CompletableFuture[] cfs = taskList.stream().map(object-> CompletableFuture.supplyAsync(()->calc(object), exs)
.thenApply(h->Integer.toString(h))
//如需获取任务完成先后顺序,修改此处代码即可
.whenComplete((v, e) -> {
log.info("{}, 测试多线程定时任务执行时间为{}, 正在执行第{}个子任务", Thread.currentThread().getName(), new Date(), v);
log.info("任务"+v+"完成!result="+v+",异常 e="+e+","+new Date());
list.add(v);
})).toArray(CompletableFuture[]::new);
// 等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();
log.info("任务完成先后顺序为" + list + ",总耗时="+(System.currentTimeMillis()-start));
} catch (Exception e) {
e.printStackTrace();
}finally {
exs.shutdown();
}
}
/**
* 组合多个CompletableFuture为一个CompletableFuture,所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
* @param futures List
* @return
*/
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
//1.构造一个空CompletableFuture,子任务数为入参任务list size
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
//2.流式(总任务完成后,每个子任务join取结果,后转换为list)
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
/**
* 线程执行耗时及异常统计
* @param i
* @return
*/
public static Integer calc(Integer i){
try {
Thread.sleep(2000);
log.info("Task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!+"+new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# 4. Java多线程基础
# 4.1 基础概念
# 4.1.1 进程与线程
进程是程序运行和资源分配的基本单位,一个程序至少有一个进程,一个进程至少有一个线程,同一进程中的多个线程之间可以并发执行。
- 进程在执行过程中拥有独立的内存单元,而多个线程共享内存资源,减少切换次数,从而效率更高。
- 线程是进程的一个实体,是CPU调度和分配的基本单位,是比程序更小的能独立运行的基本单位。
# 4.1.2 并行与并发
并行是指两个或者多个事件在同一时刻发生,而并发是指两个或多个事件在同一时间间隔发生。
- 并发:单核CPU运行,多线程时,时间片进行很快的切换,线程轮流执行CPU。
- 并行:多核CPU运行,多线程时,真正的在同一时刻运行。

# 4.2 线程的五种状态
线程通常都有五种状态,创建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)。
[1] 创建状态:线程对象被创建后,就进入了新建状态。
- Thread thread = new Thread(),还没有调用start()方法。
[2] 就绪状态:也被称为“可执行状态”,线程对象被创建后,其它线程调用了该对象的start()方法,从而来启动该线程。
- thread.start(),处于就绪状态的线程,随时可能被CPU调度执行。
[3] 运行状态:线程获取CPU权限进行执行。
- Runnable状态和Running状态可相互切换,因为有可能线程运行一段时间后,有其他高优先级的线程抢占了CPU资源,这时线程就从Running状态变成Runnable状态。
[4] 阻塞状态:是线程因为某种原因放弃CPU使用权,暂时停止运行。直到线程进入就绪状态,才有机会转到运行状态。阻塞的情况分三种:
- 等待阻塞——通过调用线程的wait()方法,让线程等待某工作的完成。
- 同步阻塞——线程在获取synchronized同步锁失败(因为锁被其它线程所占用),它会进入同步阻塞状态。
- 其他阻塞——通过调用线程的sleep()或join()或发出了I/O请求时,线程会进入到阻塞状态。当sleep()状态超时、join()等待线程终止或者超时、或者I/O处理完毕时,线程重新转入就绪状态。
[5] 死亡状态:线程执行完了或者因异常退出了run()方法,该线程结束生命周期。
# 4.3 线程死锁问题
# 4.3.1 死锁问题概述
死锁是操作系统层面的一个错误,是进程死锁的简称,最早在 1965 年由 Dijkstra 在研究银行家算法时提出的,它是计算机操作系统乃至整个并发程序设计领域最难处理的问题之一。
事实上,计算机世界有很多事情需要用多线程方式解决,因为这样才能最大程度上利用资源,体现出计算的高效。但是,实际上来说,计算机系统中有很多一次只能由一个进程使用资源的情况。在多通道程序设计环境中,若干进程往往要共享这类资源,而且一个进程所需要的资源很可能不止一个。因此就会出现若干进程竞争有限资源,又因为推进顺序不当,从而构成无限期循环等待的局面,我们称这种状态为死锁。

简单一点描述,死锁是指多个进程循环等待他方占有的资源而无限期地僵持下去的局面。很显然,如果没有外力的作用,那么死锁涉及到的各个进程都将永远处于封锁状态。系统发生死锁现象不仅浪费大量的系统资源,还会导致整个系统崩溃,带来灾难性后果。所以,对于死锁问题在理论上和技术上都必须予以高度重视。
# 4.3.2 产生死锁的条件
[1] 产生死锁的四个必要条件
- 互斥条件:该资源任意时刻只由一个线程占有
- 请求与保持条件:一个进程因请求资源阻塞时,对已有资源保持不放。
- 不剥夺条件:线程已经获得的资源在未使用完之前不能被其他线程强行剥夺,只有自己使用完才能释放资源
- 循环等待条件:若干进程形成一种头尾相接的循环等待资源关系。
[2] 如何让避免线程死锁?
想要破坏死锁,就是破坏上面四个必要条件之一。
- 破坏互斥条件 :这个条件我们没有办法破坏,因为我们用锁本来就是想让他们互斥的。
- 破坏请求与保持条件: 一次性申请所有资源。
- 破坏不剥夺条件:占用部分资源的线程申请其他资源时,如果申请不到,可以主动释放自己的资源。
- 破坏循环等待条件:靠按序申请来预防,申请资源时按序申请,释放的时候反序。
# 5. 实现Java多线程
# 5.1 实现Java多线程的方式
在Java中,多线程的主要实现方式有以下四种:
- 继承Thread类;
- 实现Runnable接口;
- 实现Callable接口通过FutureTask包装器来创建Thread线程;
- 使用ExecutorService、Callable、Future实现有返回结果的多线程。
其中前两种方式线程执行完后都没有返回值,而后两种是带返回值的。
# 5.1.1 继承Thread类创建线程
Thread类本质上也是实现了Runnable接口的一个实例,代表一个线程的实例。启动线程的唯一方法就是通过Thread类的start()实例方法。start()方法是一个native方法,它将启动一个新线程,并执行run()方法。这种方式实现多线程比较简单,通过继承Thread类并复写run()方法,就可以启动新线程并执行自己定义的run()方法。
CreateThreadDemo1.java
public class CreateThreadDemo1 extends Thread {
public CreateThreadDemo1(String name) {
// 设置当前线程的名字
this.setName(name);
}
@Override
public void run() {
System.out.println("当前运行的线程名为: " + Thread.currentThread().getName());
}
public static void main(String[] args) throws Exception {
// 注意这里,要调用start方法才能启动线程,不能调用run方法
new CreateThreadDemo1("MyThread1").start();
new CreateThreadDemo1("MyThread2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 5.1.2 实现Runnable接口创建线程
由于Java是单继承机制,如果自己的类已经继承自另一个类,则无法再直接继承Thread类,此时,可以通过实现Runnable接口来实现多线程。
实现Runnable接口并实现其中的run方法,然后通过构造Thread实例,传入Runnable实现类,然后调用Thread的start方法即可开启一个新线程。
CreateThreadDemo2.java
public class CreateThreadDemo2 implements Runnable {
@Override
public void run() {
System.out.println("当前运行的线程名为: " + Thread.currentThread().getName());
}
public static void main(String[] args) throws Exception {
CreateThreadDemo2 runnable = new CreateThreadDemo2();
new Thread(runnable, "MyThread1").start();
new Thread(runnable, "MyThread2").start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 5.1.3 实现Callable接口通过FutureTask包装器来创建Thread线程
首先需要一个实现Callable接口的实例,然后实现该接口的唯一方法call逻辑,接着把Callable实例包装成FutureTask传递给Thread实例启动新线程。FutureTask本质上也实现了Runnable接口,所以同样可以用来构造Thread实例。
CreateThreadDemo3.java
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class CreateThreadDemo3 {
public static void main(String[] args) throws Exception {
// 创建线程任务,lambada方式实现接口并实现call方法
Callable<Integer> callable = () -> {
System.out.println("线程任务开始执行了...");
Thread.sleep(2000);
return 1;
};
// 将任务封装为FutureTask
FutureTask<Integer> task = new FutureTask<>(callable);
// 开启线程,执行线程任务
new Thread(task).start();
// 这里是在线程启动之后,线程结果返回之前
System.out.println("线程启动之后,线程结果返回之前...");
// 为所欲为完毕之后,拿到线程的执行结果
Integer result = task.get();
System.out.println("主线程中拿到异步任务执行的结果为:" + result);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 5.1.4 使用ExecutorService、Callable、Future实现有返回结果的线程
ExecutorService、Callable、Future三个接口都是属于Executor框架。可返回值的任务必须实现Callable接口。通过ExecutorService执行Callable任务后,可以获取到一个Future的对象,在该对象上调用get()就可以获取到Callable任务返回的结果了。
注意:Future的get方法是阻塞的(即线程无返回结果,get方法会一直等待)。
CreateThreadDemo4.java
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CreateThreadDemo4 {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("---- 主程序开始运行 ----");
Date startTime = new Date();
int taskSize = 5;
// 创建一个线程池,Executors提供了创建各种类型线程池的方法,具体详情请自行查阅
ExecutorService executorService = Executors.newFixedThreadPool(taskSize);
// 创建多个有返回值的任务
List<Future> futureList = new ArrayList<Future>();
for (int i = 0; i < taskSize; i++) {
Callable callable = new MyCallable(i);
// 执行任务并获取Future对象
Future future = executorService.submit(callable);
futureList.add(future);
}
// 关闭线程池
executorService.shutdown();
// 获取所有并发任务的运行结果
for (Future future : futureList) {
// 从Future对象上获取任务的返回值,并输出到控制台
System.out.println(">>> " + future.get().toString());
}
Date endTime = new Date();
System.out.println("---- 主程序结束运行 ----,程序运行耗时【" + (endTime.getTime() - startTime.getTime()) + "毫秒】");
}
}
class MyCallable implements Callable<Object> {
private int taskNum;
MyCallable(int taskNum) {
this.taskNum = taskNum;
}
public Object call() throws Exception {
System.out.println(">>> " + taskNum + " 线程任务启动");
Date startTime = new Date();
Thread.sleep(1000);
Date endTime = new Date();
long time = endTime.getTime() - startTime.getTime();
System.out.println(">>> " + taskNum + " 线程任务终止");
return taskNum + "线程任务返回运行结果, 当前任务耗时【" + time + "毫秒】";
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# 5.2 基于线程池的方式实现多线程执行
以下对比多线程并发,取结果并集的几种实现方案,见如下表格(摘自某大佬的对比):
Futrue | FutureTask | CompletionService | CompletableFuture | |
---|---|---|---|---|
原理 | Futrue接口 | 接口RunnableFuture的唯一实现类,RunnableFuture接口继承自Future<V> +Runnable | 内部通过阻塞队列+FutureTask接口 | JDK8实现了Future<T> , CompletionStage<T> 2个接口 |
多任务并发执行 | 支持 | 支持 | 支持 | 支持 |
获取任务结果的顺序 | 支持任务完成先后顺序 | 未知 | 支持任务完成的先后顺序 | 支持任务完成的先后顺序 |
异常捕捉 | 自己捕捉 | 自己捕捉 | 自己捕捉 | 原生API支持,返回每个任务的异常 |
建议 | CPU高速轮询,耗资源,可以使用,但不推荐。 | 功能不对口,并发任务这一块多套一层,不推荐使用。 | 推荐使用,没有JDK8 CompletableFuture之前最好的方案。 | API极端丰富,配合流式编程,速度飞起,推荐使用。 |
以下只介绍原作者最推荐的 CompletableFuture 方式:
CompletableFutureDemo.java
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
/**
* 多线程并发任务,取结果归集
*/
public class CompletableFutureDemo {
/**
* @Description 组合多个CompletableFuture为一个CompletableFuture,所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
* @param futures List
* @return
*/
public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
//1.构造一个空CompletableFuture,子任务数为入参任务list size
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
//2.流式(总任务完成后,每个子任务join取结果,后转换为list)
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}
/**
* 线程执行耗时及异常统计
* @param i
* @return
*/
public static Integer calc(Integer i){
try {
Thread.sleep(1000);
System.out.println("Task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!+"+new Date());
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
public static void main(String[] args) {
Long start = System.currentTimeMillis();
//结果集
List<String> list = new ArrayList<>();
List<String> list2 = new ArrayList<>();
//定长10线程池
ExecutorService exs = Executors.newFixedThreadPool(10);
final List<Integer> taskList = Lists.newArrayList(2,1,3,4,5,6,7,8,9,10);
try {
// 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
CompletableFuture[] cfs = taskList.stream().map(object-> CompletableFuture.supplyAsync(()->calc(object), exs)
.thenApply(h->Integer.toString(h))
//如需获取任务完成先后顺序,修改此处代码即可
.whenComplete((v, e) -> {
System.out.println("任务"+v+"完成!result="+v+",异常 e="+e+","+new Date());
list2.add(v);
})).toArray(CompletableFuture[]::new);
//等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();
System.out.println("任务完成先后顺序,结果list2="+list2+";任务提交顺序,结果list="+list+",耗时="+(System.currentTimeMillis()-start));
} catch (Exception e) {
e.printStackTrace();
}finally {
exs.shutdown();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# 6. 参考资料
[1] SpringBoot定时任务@Scheduled注解详解 from CSDN (opens new window)
[2] Java定时任务框架对比 from AceKel (opens new window)
[3] springboot在配置文件中设置特定功能的启用和停用 from CSDN (opens new window)
[4] 分布式任务调度平台XXL-JOB from 官方文档 (opens new window)
[5] XXL-JOB部署实战 from Bilibili (opens new window)
[6] 多线程并发执行任务,取结果归集。终极总结 from 博客园 (opens new window)
[7] java多线程的6种实现方式详解 from CSDN (opens new window)
[8] Java多线程编程详解 from mikechen (opens new window)
[9] 万字图解Java多线程 from segmentfault (opens new window)
[10] Java多线程 - 创建的三种方式介绍 from CSDN (opens new window)
[11] 操作系统中进程的五种状态与JAVA中线程的六种状态 from CSDN (opens new window)
[12] JAVA系列:线程的5种状态 from 51CTO (opens new window)
[13] Java 高级教程系列 - 死锁示例及解决 from ziheng's Blog (opens new window)
[14] Java实现多线程开发的四种方式,详解它们之间异同 from 华为云 (opens new window)