Quartz 原理

一、Quartz 基本介绍

1.1 Quartz特点

Quartz 具有以下特点:

  1. 强大的调度功能,例如支持丰富多样的调度方法,可以满足各种常规及特殊需求;
  2. 灵活的应用方式,例如支持任务和调度的多种组合方式,支持调度数据的多种存储方式;
  3. 分布式和集群能力,Terracotta 收购后在原来功能基础上作了进一步提升。

​ 另外,作为 Spring 默认的调度框架,Quartz 很容易与 Spring 集成实现灵活可配置的调度功能。

quartz调度核心元素

  1. Scheduler: 任务调度器,是实际执行任务调度的控制器。在spring中通过SchedulerFactoryBean封装起来。
  2. Trigger:触发器,用于定义任务调度的时间规则,有SimpleTrigger,CronTrigger,其中CronTrigger用的比较多。CronTrigger在spring中封装在CronTriggerFactoryBean中。
  3. JobDetail: 用来描述Job实现类及其它相关的静态信息,如Job名字、关联监听器等信息。在spring中有JobDetailFactoryBeanMethodInvokingJobDetailFactoryBean两种实现,如果任务调度只需要执行某个类的某个方法,就可以通过MethodInvokingJobDetailFactoryBean来调用。
  4. Job: 是一个接口,只有一个方法void execute(JobExecutionContext context),开发者实现该接口定义运行任务,JobExecutionContext类提供了调度上下文的各种信息。Job运行时的信息保存在JobDataMap实例中。实现Job接口的任务,默认是无状态的,若要将Job设置成有状态的,在quartz中是给实现的Job添加@DisallowConcurrentExecution注解(以前是实现StatefulJob接口,现在已被Deprecated),在与spring结合中可以在spring配置文件的job detail中配置concurrent参数。
  5. QuartzSchedulerResources:相当于调度的资源存放器,包含了JobStore, ThreadPool等资源,调度都是通过 QuartzSchedulerResources获取相关属性的。

实现一个最简单的 Quartz 定时任务(不支持多机),有几个步骤:

  1. 创建 Job。
  2. 创建 JobBuilder。顾名思义,可以用于生成 JobDetail 。
  3. 创建 TriggerBuilder。作用:配置定时时间,可以用于生成 Trigger 。
  4. 创建 Scheduler。作用:启动定时任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class HelloJob implements Job {
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println("Hello World");
}
public static void main(String[] args) throws Exception {
JobDetail job = JobBuilder.newJob(HelloJob.class).withIdentity("dummyJobName", "group1").build();
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("dummyTriggerName", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?")).build();
Scheduler scheduler = new StdSchedulerFactory().getScheduler();
scheduler.start();
scheduler.scheduleJob(job, trigger);
}
}

1.2 Quartz 集群配置

quartz集群是通过数据库表来感知其他的应用的,各个节点之间并没有直接的通信。只有使用持久的JobStore才能完成Quartz集群。
数据库表:以前有12张表,现在只有11张表,现在没有存储listener相关的表,多了QRTZ_SIMPROP_TRIGGERS表:

Table nameDescription
QRTZ_CALENDARS存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS存储程序的悲观锁的信息
QRTZ_JOB_DETAILS存储每一个已配置的Job的详细信息
QRTZ_SIMPLE_TRIGGERS存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERSTrigger作为Blob类型存储
QRTZ_TRIGGERS存储已配置的Trigger的信息
QRTZ_SIMPROP_TRIGGERS

QRTZ_LOCKS就是Quartz集群实现同步机制的行锁表,包括以下几个锁:CALENDAR_ACCESSJOB_ACCESSMISFIRE_ACCESSSTATE_ACCESSTRIGGER_ACCESS

二、Quartz 原理及流程

2.1 quartz基本原理

Quartz是通过对用户暴露出Scheduler来进行任务的操作,它可以把任务JobDetail和触发器Trigger加入任务池中,可以把任务删除,也可以把任务停止,scheduler把这些任务和触发器放到一个JobStore中,这里jobStore有内存形式的也有持久化形式的.

它内部会通过一个调度线程QuartzSchedulerThread不断到JobStore中找出下次需要执行的任务,并把这些任务封装放到一个线程池ThreadPool中运行.

在 Quartz 中, scheduler 由 scheduler 工厂创建:DirectSchedulerFactory 或者 StdSchedulerFactory。 第二种工厂 StdSchedulerFactory 使用较多,因为 DirectSchedulerFactory 使用起来不够方便,需要作许多详细的手工编码设置。 Scheduler 主要有三种:RemoteMBeanSchedulerRemoteSchedulerStdScheduler。以最常用的 StdScheduler 为例讲解。

Quartz 核心元素之间的关系如下图所示:

图 1. Quartz 核心元素关系图

线程视图

在 Quartz 中,有两类线程,Scheduler 调度线程和任务执行线程,其中任务执行线程通常使用一个线程池维护一组线程。

图 2. Quartz 线程视图

Scheduler 调度线程主要有两个: 执行常规调度的线程,和执行 misfired trigger 的线程。常规调度线程轮询存储的所有 trigger,如果有需要触发的 trigger,即到达了下一次触发的时间,则从任务执行线程池获取一个空闲线程,执行与该 trigger 关联的任务。Misfire 线程是扫描所有的 trigger,查看是否有 misfired trigger,如果有的话根据 misfire 的策略分别处理。下图描述了这两个线程的基本流程:

图 3. Quartz 调度线程流程图

图 4. Quartz 调度执行时序图

img

2.2 quartz源码分析

  • StdSchedulerFactory.getScheduler()源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public Scheduler getScheduler() throws SchedulerException {
// 读取quartz配置文件,未指定则顺序遍历各个path下的quartz.properties文件
// 解析出quartz配置内容和环境变量,存入PropertiesParser对象
// PropertiesParser组合了Properties(继承Hashtable),定义了一系列对Properties的操作方法,比如getPropertyGroup()批量获取相同前缀的配置。配置内容和环境变量存放在Properties成员变量中
if (cfg == null) {
initialize();
}
// 获取调度器池,采用了单例模式
// 其实,调度器池的核心变量就是一个hashmap,每个元素key是scheduler名,value是scheduler实例
// getInstance()用synchronized防止并发创建
SchedulerRepository schedRep = SchedulerRepository.getInstance();
// 从调度器池中取出当前配置所用的调度器
Scheduler sched = schedRep.lookup(getSchedulerName());
......
// 如果调度器池中没有当前配置的调度器,则实例化一个调度器,主要动作包括:
// 1)初始化threadPool(线程池):开发者可以通过org.quartz.threadPool.class配置指定使用哪个线程池类,比如SimpleThreadPool。先class load线程池类,接着动态生成线程池实例bean,然后通过反射,使用setXXX()方法将以org.quartz.threadPool开头的配置内容赋值给bean成员变量;
// 2)初始化jobStore(任务存储方式):开发者可以通过org.quartz.jobStore.class配置指定使用哪个任务存储类,比如RAMJobStore。先class load任务存储类,接着动态生成实例bean,然后通过反射,使用setXXX()方法将以org.quartz.jobStore开头的配置内容赋值给bean成员变量;
// 3)初始化dataSource(数据源):开发者可以通过org.quartz.dataSource配置指定数据源详情,比如哪个数据库、账号、密码等。jobStore要指定为JDBCJobStore,dataSource才会有效;
// 4)初始化其他配置:包括SchedulerPlugins、JobListeners、TriggerListeners等;
// 5)初始化threadExecutor(线程执行器):默认为DefaultThreadExecutor;
// 6)创建工作线程:根据配置创建N个工作thread,执行start()启动thread,并将N个thread顺序add进threadPool实例的空闲线程列表availWorkers中;
// 7)创建调度器线程:创建QuartzSchedulerThread实例,并通过threadExecutor.execute(实例)启动调度器线程;
// 8)创建调度器:创建StdScheduler实例,将上面所有配置和引用组合进实例中,并将实例存入调度器池中
sched = instantiate();
return sched;
}

上面有个过程是初始化jobStore,表示使用哪种方式存储scheduler相关数据。quartz有两大jobStore:RAMJobStoreJDBCJobStoreRAMJobStore把数据存入内存,性能最高,配置也简单,但缺点是系统挂了难以恢复数据。JDBCJobStore保存数据到数据库,保证数据的可恢复性,但性能较差且配置复杂。

  • QuartzScheduler.scheduleJob(JobDetail, Trigger)源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Date scheduleJob(JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
// 检查调度器是否开启,如果关闭则throw异常到上层
validateState();
......
// 获取trigger首次触发job的时间,以此时间为起点,每隔一段指定的时间触发job
Date ft = trig.computeFirstFireTime(cal);
if (ft == null) {
throw new SchedulerException(
"Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
}
// 把job和trigger注册进调度器的jobStore
resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
// 通知job监听者
notifySchedulerListenersJobAdded(jobDetail);
// 通知调度器线程
notifySchedulerThread(trigger.getNextFireTime().getTime());
// 通知trigger监听者
notifySchedulerListenersSchduled(trigger);
return ft;
}
  • QuartzScheduler.start()源码
1
2
3
4
5
6
7
public void start() throws SchedulerException {
......
// 这句最关键,作用是使调度器线程跳出一个无限循环,开始轮询所有trigger触发job
// 原理详见“如何采用多线程进行任务调度”
schedThread.togglePause(false);
......
}

如何采用多线程进行任务调度

  • QuartzSchedulerThread.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 调度器线程一旦启动,将一直运行此方法
public void run() {
......
// while()无限循环,每次循环取出时间将到的trigger,触发对应的job,直到调度器线程被关闭
// halted是一个AtomicBoolean类变量,有个volatile int变量value,其get()方法仅仅简单的一句return value != 0,get()返回结果表示调度器线程是否开关
// volatile修饰的变量,存取必须走内存,不能通过cpu缓存,这样一来get总能获得set的最新真实值,因此volatile变量适合用来存放简单的状态信息
// 顾名思义,AtomicBoolean要解决原子性问题,但volatile并不能保证原子性,详见http://blog.csdn.net/wxwzy738/article/details/43238089
while (!halted.get()) {
try {
// check if we're supposed to pause...
// sigLock是个Object对象,被用于加锁同步
// 需要用到wait(),必须加到synchronized块内
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
// 这里会不断循环等待,直到QuartzScheduler.start()调用了togglePause(false)
// 调用wait(),调度器线程进入休眠状态,同时sigLock锁被释放
// togglePause(false)获得sigLock锁,将paused置为false,使调度器线程能够退出此循环,同时执行sigLock.notifyAll()唤醒调度器线程
sigLock.wait(1000L);
} catch (InterruptedException ignore) {}
}
......
}
......
// 如果线程池中的工作线程个数 > 0
if(availThreadCount > 0) {
......
// 获取马上到时间的trigger
// 允许取出的trigger个数不能超过一个阀值,这个阀值是线程池个数与org.quartz.scheduler.batchTriggerAcquisitionMaxCount配置值间的最小者
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
......
// 执行与trigger绑定的job
// shell是JobRunShell对象,实现了Runnable接口
// SimpleThreadPool.runInThread(Runnable)从线程池空闲列表中取出一个工作线程
// 工作线程执行WorkerThread.run(Runnable),详见下方WorkerThread的讲解
if (qsRsrcs.getThreadPool().runInThread(shell) == false) { ...... }
} else {......}
......
} catch(RuntimeException re) {......}
} // while (!halted)
......
}
  • WorkerThread.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void run(Runnable newRunnable) {
synchronized(lock) {
if(runnable != null) {
throw new IllegalStateException("Already running a Runnable!");
}
runnable = newRunnable;
lock.notifyAll();
}
}
// 工作线程一旦启动,将一直运行此方法
@Override
public void run() {
boolean ran = false;
// 工作线程一直循环等待job,直到线程被关闭,原理同QuartzSchedulerThread.run()中的halted.get()
while (run.get()) {
try {
// 原理同QuartzSchedulerThread.run()中的synchronized (sigLock)
// 锁住lock,不断循环等待job,当job要被执行时,WorkerThread.run(Runnable)被调用,job运行环境被赋值给runnable
synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}
// 开始执行job
if (runnable != null) {
ran = true;
// runnable.run()将触发运行job实现类(比如JobImpl.execute())
runnable.run();
}
}
} catch (InterruptedException unblock) {
......
}
}
......
}

核心代码就是在while循环中调用Object.wait(),等待可以跳出while循环的条件成立,当条件成立时,立马调度Object.notifyAll()使线程跳出while。通过这样的代码,可以实现调度器线程等待启动、工作线程等待job等功能。

参考

quartz原理揭秘和源码解读

quartz (从原理到应用)详解篇

热评文章