跳到主要内容

quartz 代码乱看

代码乱看入门

2023-01-03

  1. 阅读quartz源码example中的代码, 很快就能知道quartz的用法, 对job, trigger, schedule的概念也能很快掌握: job就是具体执行的任务代码; jobdetail用来对任务做一些额外的描述; trigger是定时信息, 何时触发任务, 周期性如何; schdule就是整体的触发调度.
  2. 跟着example中的代码一点点调入每个具体的方法定义, 比如schedule factory的initiate, 比如schedule的start, 然后一点点补全视图, 最后发现核心就在几个类上面: scheduleThread用来调度分发任务;worker threadpool用来执行具体的任务; jobstore用来将job和trigger的状态保存记录, 要么记录在ram内存里, 要么记录在jdbc数据库里. 每次要调度的任务都从数据库里获取某个时间间隔内的 latest triggers, 然后由scheduleThread转发到threadPool中空闲的线程.
  3. 看起来quartz只支持一些简单的时间周期任务, 比如cron定义的时间, 或是java代码定义的时间, 并没有什么dag有向无环图这类调度的概念, 不支持上下游调度, 估计得业务代码自己去实现了.
  4. 执行的任务需要implement quart job, 看起来不支持远程调用? 估计得在job里配置远程调用, 然后worker thread调用本地的job, 再由job去触发远程任务.
  5. 计算下一步执行的trigger算是一个关键的逻辑, 每次都批量从数据库里获取最近要执行的trigger调度任务, 然后一个个处理, 接下来再获取下一批次. 猜测每个调度任务的下一步执行信息记录在数据库里, 执行的时候再去判断下一次的调度信息, 再保存到数据库里, 这样调度的时间模型会简化许多. 猜测这里还没详细看代码, 不过估计应该是这么设计的.
  6. 整体而言, 代码里线程和锁满天飞, jobstore里的存储也非常混乱, 看起来还是很耗费时间的, 一两天只能看个入门, 深入的话得在实践中一边测试一边定位了.
  7. 就算是企业级的调度组件, 线程和锁满天飞, 看起来也非常容易出错, 估计是因为迭代了很久才稳定下来. 所以大家其实差不多, 核心链路确定没问题的话, 有问题解决问题.

一些疑问点

  • quartz的集群化是如何处理没怎么看明白. 看了一些文档, 看起来使用了jdbc的持久化来支持多个quartz实例的运行, 更像是把worker threadpool这种工作线程集群化管理来, 不知道quartz计算和调度的核心有没有集群化管理?
  • crontab类的任务, 看起来是每次都记录下一次执行的时间, 然后保存起来等待调用?
  • misfire策略可以配置错过的任务是立刻补录还是丢失. 如果重启后发现一堆crontab任务已经是一个月前的了, 这时候根据misfire策略是会重新生成所有缺失的调度任务全部补充起来吗? 这个补充的过程是序列化的, 还是可以配置成异步多线程的?

https://github.com/ltephanysopez/quartz-job-scheduling/tree/master

picture 1

执行的关键代码跟进

example代码入门

quartz的example用例代码, 跟进可以看完所有内部关键逻辑.

    log.info("------- Initializing ----------------------");

// First we must get a reference to a scheduler
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();

log.info("------- Initialization Complete -----------");

// computer a time that is on the next round minute
Date runTime = evenMinuteDate(new Date());

log.info("------- Scheduling Job -------------------");

// define the job and tie it to our HelloJob class
JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();

// Trigger the job to run on the next round minute
Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();

// Tell quartz to schedule the job using our trigger
sched.scheduleJob(job, trigger);
log.info(job.getKey() + " will run at: " + runTime);

// Start up the scheduler (nothing can actually run until the
// scheduler has been started)
sched.start();

QuartzScheduler保存store和trigger

scheduleJob其实主要保存job和tribber信息, 同时触发线程通知.

sched.scheduleJob(job, trigger);
    /**
* <p>
* Add the <code>{@link org.quartz.Job}</code> identified by the given
* <code>{@link org.quartz.JobDetail}</code> to the Scheduler, and
* associate the given <code>{@link org.quartz.Trigger}</code> with it.
* </p>
*
* <p>
* If the given Trigger does not reference any <code>Job</code>, then it
* will be set to reference the Job passed with it into this method.
* </p>
*
* @throws SchedulerException
* if the Job or Trigger cannot be added to the Scheduler, or
* there is an internal Scheduler error.
*/
public Date scheduleJob(JobDetail jobDetail,
Trigger trigger) throws SchedulerException {
validateState();


OperableTrigger trig = (OperableTrigger)trigger;

if (trigger.getJobKey() == null) {
trig.setJobKey(jobDetail.getKey());
} else if (!trigger.getJobKey().equals(jobDetail.getKey())) {
throw new SchedulerException(
"Trigger does not reference given job!");
}

trig.validate();

Calendar cal = null;
if (trigger.getCalendarName() != null) {
cal = resources.getJobStore().retrieveCalendar(trigger.getCalendarName());
}
Date ft = trig.computeFirstFireTime(cal);

if (ft == null) {
throw new SchedulerException(
"Based on configured schedule, the given trigger '" + trigger.getKey() + "' will never fire.");
}

resources.getJobStore().storeJobAndTrigger(jobDetail, trig);
notifySchedulerListenersJobAdded(jobDetail);
notifySchedulerThread(trigger.getNextFireTime().getTime());
notifySchedulerListenersSchduled(trigger);

return ft;
}
  • jdbc保存job和trigger
    /**
* <p>
* Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
* </p>
*
* @param newJob
* The <code>JobDetail</code> to be stored.
* @param newTrigger
* The <code>Trigger</code> to be stored.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* exists.
*/
public void storeJobAndTrigger(final JobDetail newJob,
final OperableTrigger newTrigger)
throws JobPersistenceException {
executeInLock(
(isLockOnInsert()) ? LOCK_TRIGGER_ACCESS : null,
new VoidTransactionCallback() {
public void executeVoid(Connection conn) throws JobPersistenceException {
storeJob(conn, newJob, false);
storeTrigger(conn, newTrigger, newJob, false,
Constants.STATE_WAITING, false, false);
}
});
}
  • ram保存job和trigger
    /**
* <p>
* Store the given <code>{@link org.quartz.JobDetail}</code> and <code>{@link org.quartz.Trigger}</code>.
* </p>
*
* @param newJob
* The <code>JobDetail</code> to be stored.
* @param newTrigger
* The <code>Trigger</code> to be stored.
* @throws ObjectAlreadyExistsException
* if a <code>Job</code> with the same name/group already
* exists.
*/
public void storeJobAndTrigger(JobDetail newJob,
OperableTrigger newTrigger) throws JobPersistenceException {
storeJob(newJob, false);
storeTrigger(newTrigger, false);
}


public void storeJob(JobDetail newJob,
boolean replaceExisting) throws ObjectAlreadyExistsException {
JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());

boolean repl = false;

synchronized (lock) {
if (jobsByKey.get(jw.key) != null) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newJob);
}
repl = true;
}

if (!repl) {
// get job group
HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
if (grpMap == null) {
grpMap = new HashMap<JobKey, JobWrapper>(100);
jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
}
// add to jobs by group
grpMap.put(newJob.getKey(), jw);
// add to jobs by FQN map
jobsByKey.put(jw.key, jw);
} else {
// update job detail
JobWrapper orig = jobsByKey.get(jw.key);
orig.jobDetail = jw.jobDetail; // already cloned
}
}
}

public void storeTrigger(OperableTrigger newTrigger,
boolean replaceExisting) throws JobPersistenceException {
TriggerWrapper tw = new TriggerWrapper((OperableTrigger)newTrigger.clone());

synchronized (lock) {
if (triggersByKey.get(tw.key) != null) {
if (!replaceExisting) {
throw new ObjectAlreadyExistsException(newTrigger);
}

removeTrigger(newTrigger.getKey(), false);
}

if (retrieveJob(newTrigger.getJobKey()) == null) {
throw new JobPersistenceException("The job ("
+ newTrigger.getJobKey()
+ ") referenced by the trigger does not exist.");
}

// add to triggers by job
List<TriggerWrapper> jobList = triggersByJob.get(tw.jobKey);
if(jobList == null) {
jobList = new ArrayList<TriggerWrapper>(1);
triggersByJob.put(tw.jobKey, jobList);
}
jobList.add(tw);

// add to triggers by group
HashMap<TriggerKey, TriggerWrapper> grpMap = triggersByGroup.get(newTrigger.getKey().getGroup());
if (grpMap == null) {
grpMap = new HashMap<TriggerKey, TriggerWrapper>(100);
triggersByGroup.put(newTrigger.getKey().getGroup(), grpMap);
}
grpMap.put(newTrigger.getKey(), tw);
// add to triggers by FQN map
triggersByKey.put(tw.key, tw);

if (pausedTriggerGroups.contains(newTrigger.getKey().getGroup())
|| pausedJobGroups.contains(newTrigger.getJobKey().getGroup())) {
tw.state = TriggerWrapper.STATE_PAUSED;
if (blockedJobs.contains(tw.jobKey)) {
tw.state = TriggerWrapper.STATE_PAUSED_BLOCKED;
}
} else if (blockedJobs.contains(tw.jobKey)) {
tw.state = TriggerWrapper.STATE_BLOCKED;
} else {
timeTriggers.add(tw);
}
}
}

  • notifySchedulerThread通知QuartzSchedulerThread开始启动调度
    protected void notifySchedulerThread(long candidateNewNextFireTime) {
if (isSignalOnSchedulingChange()) {
signaler.signalSchedulingChange(candidateNewNextFireTime);
}
}
    public void signalSchedulingChange(long candidateNewNextFireTime) {
schedThread.signalSchedulingChange(candidateNewNextFireTime);
}

public class QuartzSchedulerThread extends Thread {
...
/**
* <p>
* Signals the main processing loop that a change in scheduling has been
* made - in order to interrupt any sleeping that may be occuring while
* waiting for the fire time to arrive.
* </p>
*
* @param candidateNewNextFireTime the time (in millis) when the newly scheduled trigger
* will fire. If this method is being called do to some other even (rather
* than scheduling a trigger), the caller should pass zero (0).
*/
public void signalSchedulingChange(long candidateNewNextFireTime) {
synchronized(sigLock) {
signaled = true;
signaledNextFireTime = candidateNewNextFireTime;
sigLock.notifyAll();
}
}
}

QuartzSchedulerThread 获取即将执行的调度trigger

这个线程只有一个, 就是用于从后台ram/jdbc存储里获取即将运行的调度任务 latest trigger.

/**
* <p>
* The thread responsible for performing the work of firing <code>{@link Trigger}</code>
* s that are registered with the <code>{@link QuartzScheduler}</code>.
* </p>
*
* @see QuartzScheduler
* @see org.quartz.Job
* @see Trigger
*
* @author James House
*/
public class QuartzSchedulerThread extends Thread {

...

/**
* <p>
* Signals the main processing loop to pause at the next possible point.
* </p>
*/
void togglePause(boolean pause) {
synchronized (sigLock) {
paused = pause;

if (paused) {
signalSchedulingChange(0);
} else {
sigLock.notifyAll();
}
}
}

/**
* <p>
* Signals the main processing loop to pause at the next possible point.
* </p>
*/
void halt(boolean wait) {
synchronized (sigLock) {
halted.set(true);

if (paused) {
sigLock.notifyAll();
} else {
signalSchedulingChange(0);
}
}
this.interrupt();

if (wait) {
boolean interrupted = false;
try {
while (true) {
try {
join();
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}

boolean isPaused() {
return paused;
}

/**
* <p>
* Signals the main processing loop that a change in scheduling has been
* made - in order to interrupt any sleeping that may be occuring while
* waiting for the fire time to arrive.
* </p>
*
* @param candidateNewNextFireTime the time (in millis) when the newly scheduled trigger
* will fire. If this method is being called do to some other even (rather
* than scheduling a trigger), the caller should pass zero (0).
*/
public void signalSchedulingChange(long candidateNewNextFireTime) {
synchronized(sigLock) {
signaled = true;
signaledNextFireTime = candidateNewNextFireTime;
sigLock.notifyAll();
}
}

public void clearSignaledSchedulingChange() {
synchronized(sigLock) {
signaled = false;
signaledNextFireTime = 0;
}
}

public boolean isScheduleChanged() {
synchronized(sigLock) {
return signaled;
}
}

public long getSignaledNextFireTime() {
synchronized(sigLock) {
return signaledNextFireTime;
}
}

/**
* <p>
* The main processing loop of the <code>QuartzSchedulerThread</code>.
* </p>
*/
@Override
public void run() {
int acquiresFailed = 0;

while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}

// reset failure counter when paused, so that we don't
// wait again after unpausing
acquiresFailed = 0;
}

if (halted.get()) {
break;
}
}

// wait a bit, if reading from job store is consistently
// failing (e.g. DB is down or restarting)..
if (acquiresFailed > 1) {
try {
long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
Thread.sleep(delay);
} catch (Exception ignore) {
}
}

int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
synchronized (sigLock) {
if (halted.get()) {
break;
}
}
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

List<OperableTrigger> triggers;

long now = System.currentTimeMillis();

clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
acquiresFailed = 0;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if (acquiresFailed == 0) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
} catch (RuntimeException e) {
if (acquiresFailed == 0) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
if (acquiresFailed < Integer.MAX_VALUE)
acquiresFailed++;
continue;
}

if (triggers != null && !triggers.isEmpty()) {

now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
synchronized (sigLock) {
if (halted.get()) {
break;
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}

// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;

// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}

}

for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();

if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}

// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}

JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}

if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}

}

continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}

long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}

} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)

// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}

核心代码是这段, 获取即将运行的trigger数据

triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

RAMJobStore获取acquireNextTriggers

ram store获取acquireNextTriggers的实现


/**
* <p>
* This class implements a <code>{@link org.quartz.spi.JobStore}</code> that
* utilizes RAM as its storage device.
* </p>
*
* <p>
* As you should know, the ramification of this is that access is extrememly
* fast, but the data is completely volatile - therefore this <code>JobStore</code>
* should not be used if true persistence between program shutdowns is
* required.
* </p>
*
* @author James House
* @author Sharada Jambula
* @author Eric Mueller
*/
public class RAMJobStore implements JobStore {
...
/**
* <p>
* Get a handle to the next trigger to be fired, and mark it as 'reserved'
* by the calling scheduler.
* </p>
*
* @see #releaseAcquiredTrigger(OperableTrigger)
*/
public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {
synchronized (lock) {
List<OperableTrigger> result = new ArrayList<OperableTrigger>();
Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();
long batchEnd = noLaterThan;

// return empty list if store has no triggers.
if (timeTriggers.size() == 0)
return result;

while (true) {
TriggerWrapper tw;

try {
tw = timeTriggers.first();
if (tw == null)
break;
timeTriggers.remove(tw);
} catch (java.util.NoSuchElementException nsee) {
break;
}

if (tw.trigger.getNextFireTime() == null) {
continue;
}

if (applyMisfire(tw)) {
if (tw.trigger.getNextFireTime() != null) {
timeTriggers.add(tw);
}
continue;
}

if (tw.getTrigger().getNextFireTime().getTime() > batchEnd) {
timeTriggers.add(tw);
break;
}

// If trigger's job is set as @DisallowConcurrentExecution, and it has already been added to result, then
// put it back into the timeTriggers set and continue to search for next trigger.
JobKey jobKey = tw.trigger.getJobKey();
JobDetail job = jobsByKey.get(tw.trigger.getJobKey()).jobDetail;
if (job.isConcurrentExectionDisallowed()) {
if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
excludedTriggers.add(tw);
continue; // go to next trigger in store.
} else {
acquiredJobKeysForNoConcurrentExec.add(jobKey);
}
}

tw.state = TriggerWrapper.STATE_ACQUIRED;
tw.trigger.setFireInstanceId(getFiredTriggerRecordId());
OperableTrigger trig = (OperableTrigger) tw.trigger.clone();
if (result.isEmpty()) {
batchEnd = Math.max(tw.trigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
}
result.add(trig);
if (result.size() == maxCount)
break;
}

// If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.
if (excludedTriggers.size() > 0)
timeTriggers.addAll(excludedTriggers);
return result;
}
}

JobStoreSupport获取acquireNextTriggers

jdbc store获取acquireNextTriggers的实现


/**
* <p>
* Contains base functionality for JDBC-based JobStore implementations.
* </p>
*
* @author <a href="mailto:jeff@binaryfeed.org">Jeffrey Wescott</a>
* @author James House
*/
public abstract class JobStoreSupport implements JobStore, Constants {
...
/**
* <p>
* Get a handle to the next N triggers to be fired, and mark them as 'reserved'
* by the calling scheduler.
* </p>
*
* @see #releaseAcquiredTrigger(OperableTrigger)
*/
@SuppressWarnings("unchecked")
public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
throws JobPersistenceException {

String lockName;
if(isAcquireTriggersWithinLock() || maxCount > 1) {
lockName = LOCK_TRIGGER_ACCESS;
} else {
lockName = null;
}
return executeInNonManagedTXLock(lockName,
new TransactionCallback<List<OperableTrigger>>() {
public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
}
},
new TransactionValidator<List<OperableTrigger>>() {
public Boolean validate(Connection conn, List<OperableTrigger> result) throws JobPersistenceException {
try {
List<FiredTriggerRecord> acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId());
Set<String> fireInstanceIds = new HashSet<String>();
for (FiredTriggerRecord ft : acquired) {
fireInstanceIds.add(ft.getFireInstanceId());
}
for (OperableTrigger tr : result) {
if (fireInstanceIds.contains(tr.getFireInstanceId())) {
return true;
}
}
return false;
} catch (SQLException e) {
throw new JobPersistenceException("error validating trigger acquisition", e);
}
}
});
}

QuartzSchedulerThread中分发任务

QuartzSchedulerThread中任务分发到worker threadpool是这段

JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}

if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}

JobRunShell提供了任务执行上下文环境

/**
* <p>
* JobRunShell instances are responsible for providing the 'safe' environment
* for <code>Job</code> s to run in, and for performing all of the work of
* executing the <code>Job</code>, catching ANY thrown exceptions, updating
* the <code>Trigger</code> with the <code>Job</code>'s completion code,
* etc.
* </p>
*
* <p>
* A <code>JobRunShell</code> instance is created by a <code>JobRunShellFactory</code>
* on behalf of the <code>QuartzSchedulerThread</code> which then runs the
* shell in a thread from the configured <code>ThreadPool</code> when the
* scheduler determines that a <code>Job</code> has been triggered.
* </p>
*/
public class JobRunShell extends SchedulerListenerSupport implements Runnable {
...
public void initialize(QuartzScheduler sched)
throws SchedulerException {
this.qs = sched;

Job job = null;
JobDetail jobDetail = firedTriggerBundle.getJobDetail();

try {
job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
} catch (SchedulerException se) {
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
} catch (Throwable ncdfe) { // such as NoClassDefFoundError
SchedulerException se = new SchedulerException(
"Problem instantiating class '"
+ jobDetail.getJobClass().getName() + "' - ", ncdfe);
sched.notifySchedulerListenersError(
"An error occured instantiating job to be executed. job= '"
+ jobDetail.getKey() + "'", se);
throw se;
}

this.jec = new JobExecutionContextImpl(scheduler, firedTriggerBundle, job);
}

SimpleThreadPool执行shell任务

其中 qsRsrcs.getThreadPool().runInThread(shell)在线程池里执行


/**
* <p>
* This is class is a simple implementation of a thread pool, based on the
* <code>{@link org.quartz.spi.ThreadPool}</code> interface.
* </p>
*
* <p>
* <CODE>Runnable</CODE> objects are sent to the pool with the <code>{@link #runInThread(Runnable)}</code>
* method, which blocks until a <code>Thread</code> becomes available.
* </p>
*
* <p>
* The pool has a fixed number of <code>Thread</code>s, and does not grow or
* shrink based on demand.
* </p>
*
* @author James House
* @author Juergen Donnerstag
*/
public class SimpleThreadPool implements ThreadPool {
...
/**
* <p>
* Run the given <code>Runnable</code> object in the next available
* <code>Thread</code>. If while waiting the thread pool is asked to
* shut down, the Runnable is executed immediately within a new additional
* thread.
* </p>
*
* @param runnable
* the <code>Runnable</code> to be added.
*/
public boolean runInThread(Runnable runnable) {
if (runnable == null) {
return false;
}

synchronized (nextRunnableLock) {

handoffPending = true;

// Wait until a worker thread is available
while ((availWorkers.size() < 1) && !isShutdown) {
try {
nextRunnableLock.wait(500);
} catch (InterruptedException ignore) {
}
}

if (!isShutdown) {
WorkerThread wt = (WorkerThread)availWorkers.removeFirst();
busyWorkers.add(wt);
wt.run(runnable);
} else {
// If the thread pool is going down, execute the Runnable
// within a new additional worker thread (no thread from the pool).
WorkerThread wt = new WorkerThread(this, threadGroup,
"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
busyWorkers.add(wt);
workers.add(wt);
wt.start();
}
nextRunnableLock.notifyAll();
handoffPending = false;
}

return true;
}

JobRunShell中的run执行

worker threadpool的thread执行的是提交的runnable的run方法, 也就是jobrunshell里的这段


public void run() {
qs.addInternalSchedulerListener(this);

try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();

do {

JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();

try {
begin();
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't begin execution.", se);
break;
}

// notify job & trigger listeners...
try {
if (!notifyListenersBeginning(jec)) {
break;
}
} catch(VetoedException ve) {
try {
CompletedExecutionInstruction instCode = trigger.executionComplete(jec, null);
qs.notifyJobStoreJobVetoed(trigger, jobDetail, instCode);

// QTZ-205
// Even if trigger got vetoed, we still needs to check to see if it's the trigger's finalized run or not.
if (jec.getTrigger().getNextFireTime() == null) {
qs.notifySchedulerListenersFinalized(jec.getTrigger());
}

complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error during veto of Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
break;
}

long startTime = System.currentTimeMillis();
long endTime = startTime;

// execute the job
try {
log.debug("Calling execute on job " + jobDetail.getKey());
job.execute(jec);
endTime = System.currentTimeMillis();
} catch (JobExecutionException jee) {
endTime = System.currentTimeMillis();
jobExEx = jee;
getLog().info("Job " + jobDetail.getKey() +
" threw a JobExecutionException: ", jobExEx);
} catch (Throwable e) {
endTime = System.currentTimeMillis();
getLog().error("Job " + jobDetail.getKey() +
" threw an unhandled Exception: ", e);
SchedulerException se = new SchedulerException(
"Job threw an unhandled exception.", e);
qs.notifySchedulerListenersError("Job ("
+ jec.getJobDetail().getKey()
+ " threw an exception.", se);
jobExEx = new JobExecutionException(se, false);
}

jec.setJobRunTime(endTime - startTime);

// notify all job listeners
if (!notifyJobListenersComplete(jec, jobExEx)) {
break;
}

CompletedExecutionInstruction instCode = CompletedExecutionInstruction.NOOP;

// update the trigger
try {
instCode = trigger.executionComplete(jec, jobExEx);
} catch (Exception e) {
// If this happens, there's a bug in the trigger...
SchedulerException se = new SchedulerException(
"Trigger threw an unhandled exception.", e);
qs.notifySchedulerListenersError(
"Please report this error to the Quartz developers.",
se);
}

// notify all trigger listeners
if (!notifyTriggerListenersComplete(jec, instCode)) {
break;
}

// update job/trigger or re-execute job
if (instCode == CompletedExecutionInstruction.RE_EXECUTE_JOB) {
jec.incrementRefireCount();
try {
complete(false);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
}
continue;
}

try {
complete(true);
} catch (SchedulerException se) {
qs.notifySchedulerListenersError("Error executing Job ("
+ jec.getJobDetail().getKey()
+ ": couldn't finalize execution.", se);
continue;
}

qs.notifyJobStoreJobComplete(trigger, jobDetail, instCode);
break;
} while (true);

} finally {
qs.removeInternalSchedulerListener(this);
}
}

用户job定义中的具体逻辑执行

可以看到具体的执行在这里

job.execute(jec);

这个job.execute其实就是用户侧定义job的时候, 需要写入的参数. 对比example中的job定义, 就会发现原来最终是在这里运行.

demo代码入门

example

quartz example 1

  • simpleExample.java
 
package org.quartz.examples.example1;

import static org.quartz.DateBuilder.evenMinuteDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;

import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
* This Example will demonstrate how to start and shutdown the Quartz scheduler and how to schedule a job to run in
* Quartz.
*
* @author Bill Kratzer
*/
public class SimpleExample {

public void run() throws Exception {
Logger log = LoggerFactory.getLogger(SimpleExample.class);

log.info("------- Initializing ----------------------");

// First we must get a reference to a scheduler
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();

log.info("------- Initialization Complete -----------");

// computer a time that is on the next round minute
Date runTime = evenMinuteDate(new Date());

log.info("------- Scheduling Job -------------------");

// define the job and tie it to our HelloJob class
JobDetail job = newJob(HelloJob.class).withIdentity("job1", "group1").build();

// Trigger the job to run on the next round minute
Trigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runTime).build();

// Tell quartz to schedule the job using our trigger
sched.scheduleJob(job, trigger);
log.info(job.getKey() + " will run at: " + runTime);

// Start up the scheduler (nothing can actually run until the
// scheduler has been started)
sched.start();

log.info("------- Started Scheduler -----------------");

// wait long enough so that the scheduler as an opportunity to
// run the job!
log.info("------- Waiting 65 seconds... -------------");
try {
// wait 65 seconds to show job
Thread.sleep(65L * 1000L);
// executing...
} catch (Exception e) {
//
}

// shut down the scheduler
log.info("------- Shutting Down ---------------------");
sched.shutdown(true);
log.info("------- Shutdown Complete -----------------");
}

public static void main(String[] args) throws Exception {

SimpleExample example = new SimpleExample();
example.run();

}

}


- helloJob.java


/* 
* All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
*/

package org.quartz.examples.example1;

import java.util.Date;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
* <p>
* This is just a simple job that says "Hello" to the world.
* </p>
*
* @author Bill Kratzer
*/
public class HelloJob implements Job {

private static Logger _log = LoggerFactory.getLogger(HelloJob.class);

/**
* <p>
* Empty constructor for job initilization
* </p>
* <p>
* Quartz requires a public empty constructor so that the
* scheduler can instantiate the class whenever it needs.
* </p>
*/
public HelloJob() {
}

/**
* <p>
* Called by the <code>{@link org.quartz.Scheduler}</code> when a
* <code>{@link org.quartz.Trigger}</code> fires that is associated with
* the <code>Job</code>.
* </p>
*
* @throws JobExecutionException
* if there is an exception while executing the job.
*/
public void execute(JobExecutionContext context)
throws JobExecutionException {

// Say Hello to the World and display the date/time
_log.info("Hello World! - " + new Date());
}

}

其他例子

https://github.com/ltephanysopez/quartz-job-scheduling

trigger = newTrigger() {
.withIdentity(“trigger3”, “group1”)
.withSchedule(cronSchedule(“0 42 10 * * ?”))
.forJob(myJobKey)
.build();
}
trigger = newTrigger() {
.withIdentity(“trigger7”, “group1”)
.withSchedule(simpleSchedule()
.withIntervalInMinutes(5)
.repeatForever())
.endAt(dateOf(22, 0, 0))
.build();
}

线程锁问题


class WorkerThread extends Thread {

private final Object lock = new Object();

// A flag that signals the WorkerThread to terminate.
private AtomicBoolean run = new AtomicBoolean(true);

private SimpleThreadPool tp;

private Runnable runnable = null;

public void run(Runnable newRunnable) {
synchronized(lock) {
if(runnable != null) {
throw new IllegalStateException("Already running a Runnable!");
}

runnable = newRunnable;
lock.notifyAll();
}
}

/**
* <p>
* Loop, executing targets as they are received.
* </p>
*/
@Override
public void run() {
boolean ran = false;

while (run.get()) {
try {
synchronized(lock) {
while (runnable == null && run.get()) {
lock.wait(500);
}

if (runnable != null) {
ran = true;
runnable.run();
}
}
} catch (InterruptedException unblock) {
// do nothing (loop will terminate if shutdown() was called
try {
getLog().error("Worker thread was interrupt()'ed.", unblock);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} catch (Throwable exceptionInRunnable) {
try {
getLog().error("Error while executing the Runnable: ",
exceptionInRunnable);
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
} finally {
synchronized(lock) {
runnable = null;
}
// repair the thread in case the runnable mucked it up...
if(getPriority() != tp.getThreadPriority()) {
setPriority(tp.getThreadPriority());
}

if (runOnce) {
run.set(false);
clearFromBusyWorkersList(this);
} else if(ran) {
ran = false;
makeAvailable(this);
}

}
}

//if (log.isDebugEnabled())
try {
getLog().debug("WorkerThread is shut down.");
} catch(Exception e) {
// ignore to help with a tomcat glitch
}
}


chatgpt太强了, 以前这种问题都得摸索很久, 或者咨询同事才能快速得到答案

picture 0

参考文档

官方文档内容其实比较少, 看不到什么架构类的信息

https://www.quartz-scheduler.org/

https://www.quartz-scheduler.org/documentation/quartz-2.3.0/

  • bealdung的quartz入门

Introduction to Quartz

https://www.baeldung.com/quartz

很不错, 看完可以入门.

  • bealdung的quartz spring

Scheduling in Spring with Quartz https://www.baeldung.com/spring-quartz-schedule

  • github的quartz架构文档

quartz-explained

https://github.com/nkcoder/quartz-explained

https://www.jianshu.com/p/f466e02e2e94

  • Quartz源码分析

https://blog.csdn.net/CringKong/article/details/89290582

https://blog.csdn.net/CringKong/article/details/89423021

数据库连接测试

quartz.properties配置


#============================================================================
# Configure Main Scheduler Properties
#============================================================================

org.quartz.scheduler.instanceName: TestScheduler
org.quartz.scheduler.instanceId: instance_two

#============================================================================
# Configure ThreadPool
#============================================================================

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 5
org.quartz.threadPool.threadPriority: 5

#============================================================================
# Configure JobStore
#============================================================================

org.quartz.jobStore.misfireThreshold: 60000

org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.dataSource=myDS
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true

#============================================================================
# Configure Datasources
#============================================================================

org.quartz.dataSource.myDS.driver: com.mysql.cj.jdbc.Driver
org.quartz.dataSource.myDS.URL: jdbc:mysql://mysql.demo.com:8306/abc
org.quartz.dataSource.myDS.user: user
org.quartz.dataSource.myDS.password: fakePassword
org.quartz.dataSource.myDS.maxConnections: 5
org.quartz.dataSource.myDS.validationQuery: select 0

需要提前完成数据库初始化, 不然报错LOCK表不存在, 初始化sql来自quartz代码.


DROP TABLE IF EXISTS QRTZ_FIRED_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_PAUSED_TRIGGER_GRPS;
DROP TABLE IF EXISTS QRTZ_SCHEDULER_STATE;
DROP TABLE IF EXISTS QRTZ_LOCKS;
DROP TABLE IF EXISTS QRTZ_SIMPLE_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_SIMPROP_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_CRON_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_BLOB_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_TRIGGERS;
DROP TABLE IF EXISTS QRTZ_JOB_DETAILS;
DROP TABLE IF EXISTS QRTZ_CALENDARS;

CREATE TABLE QRTZ_JOB_DETAILS(
SCHED_NAME VARCHAR(120) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
JOB_CLASS_NAME VARCHAR(250) NOT NULL,
IS_DURABLE VARCHAR(1) NOT NULL,
IS_NONCONCURRENT VARCHAR(1) NOT NULL,
IS_UPDATE_DATA VARCHAR(1) NOT NULL,
REQUESTS_RECOVERY VARCHAR(1) NOT NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
JOB_NAME VARCHAR(190) NOT NULL,
JOB_GROUP VARCHAR(190) NOT NULL,
DESCRIPTION VARCHAR(250) NULL,
NEXT_FIRE_TIME BIGINT(13) NULL,
PREV_FIRE_TIME BIGINT(13) NULL,
PRIORITY INTEGER NULL,
TRIGGER_STATE VARCHAR(16) NOT NULL,
TRIGGER_TYPE VARCHAR(8) NOT NULL,
START_TIME BIGINT(13) NOT NULL,
END_TIME BIGINT(13) NULL,
CALENDAR_NAME VARCHAR(190) NULL,
MISFIRE_INSTR SMALLINT(2) NULL,
JOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,JOB_NAME,JOB_GROUP)
REFERENCES QRTZ_JOB_DETAILS(SCHED_NAME,JOB_NAME,JOB_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPLE_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
REPEAT_COUNT BIGINT(7) NOT NULL,
REPEAT_INTERVAL BIGINT(12) NOT NULL,
TIMES_TRIGGERED BIGINT(10) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CRON_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
CRON_EXPRESSION VARCHAR(120) NOT NULL,
TIME_ZONE_ID VARCHAR(80),
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SIMPROP_TRIGGERS
(
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
STR_PROP_1 VARCHAR(512) NULL,
STR_PROP_2 VARCHAR(512) NULL,
STR_PROP_3 VARCHAR(512) NULL,
INT_PROP_1 INT NULL,
INT_PROP_2 INT NULL,
LONG_PROP_1 BIGINT NULL,
LONG_PROP_2 BIGINT NULL,
DEC_PROP_1 NUMERIC(13,4) NULL,
DEC_PROP_2 NUMERIC(13,4) NULL,
BOOL_PROP_1 VARCHAR(1) NULL,
BOOL_PROP_2 VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_BLOB_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
BLOB_DATA BLOB NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP),
INDEX (SCHED_NAME,TRIGGER_NAME, TRIGGER_GROUP),
FOREIGN KEY (SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP)
REFERENCES QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_CALENDARS (
SCHED_NAME VARCHAR(120) NOT NULL,
CALENDAR_NAME VARCHAR(190) NOT NULL,
CALENDAR BLOB NOT NULL,
PRIMARY KEY (SCHED_NAME,CALENDAR_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_PAUSED_TRIGGER_GRPS (
SCHED_NAME VARCHAR(120) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
PRIMARY KEY (SCHED_NAME,TRIGGER_GROUP))
ENGINE=InnoDB;

CREATE TABLE QRTZ_FIRED_TRIGGERS (
SCHED_NAME VARCHAR(120) NOT NULL,
ENTRY_ID VARCHAR(95) NOT NULL,
TRIGGER_NAME VARCHAR(190) NOT NULL,
TRIGGER_GROUP VARCHAR(190) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
FIRED_TIME BIGINT(13) NOT NULL,
SCHED_TIME BIGINT(13) NOT NULL,
PRIORITY INTEGER NOT NULL,
STATE VARCHAR(16) NOT NULL,
JOB_NAME VARCHAR(190) NULL,
JOB_GROUP VARCHAR(190) NULL,
IS_NONCONCURRENT VARCHAR(1) NULL,
REQUESTS_RECOVERY VARCHAR(1) NULL,
PRIMARY KEY (SCHED_NAME,ENTRY_ID))
ENGINE=InnoDB;

CREATE TABLE QRTZ_SCHEDULER_STATE (
SCHED_NAME VARCHAR(120) NOT NULL,
INSTANCE_NAME VARCHAR(190) NOT NULL,
LAST_CHECKIN_TIME BIGINT(13) NOT NULL,
CHECKIN_INTERVAL BIGINT(13) NOT NULL,
PRIMARY KEY (SCHED_NAME,INSTANCE_NAME))
ENGINE=InnoDB;

CREATE TABLE QRTZ_LOCKS (
SCHED_NAME VARCHAR(120) NOT NULL,
LOCK_NAME VARCHAR(40) NOT NULL,
PRIMARY KEY (SCHED_NAME,LOCK_NAME))
ENGINE=InnoDB;

CREATE INDEX IDX_QRTZ_J_REQ_RECOVERY ON QRTZ_JOB_DETAILS(SCHED_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_J_GRP ON QRTZ_JOB_DETAILS(SCHED_NAME,JOB_GROUP);

CREATE INDEX IDX_QRTZ_T_J ON QRTZ_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_JG ON QRTZ_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_T_C ON QRTZ_TRIGGERS(SCHED_NAME,CALENDAR_NAME);
CREATE INDEX IDX_QRTZ_T_G ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_T_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_N_G_STATE ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_GROUP,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NEXT_FIRE_TIME ON QRTZ_TRIGGERS(SCHED_NAME,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST ON QRTZ_TRIGGERS(SCHED_NAME,TRIGGER_STATE,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_STATE);
CREATE INDEX IDX_QRTZ_T_NFT_ST_MISFIRE_GRP ON QRTZ_TRIGGERS(SCHED_NAME,MISFIRE_INSTR,NEXT_FIRE_TIME,TRIGGER_GROUP,TRIGGER_STATE);

CREATE INDEX IDX_QRTZ_FT_TRIG_INST_NAME ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME);
CREATE INDEX IDX_QRTZ_FT_INST_JOB_REQ_RCVRY ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,INSTANCE_NAME,REQUESTS_RECOVERY);
CREATE INDEX IDX_QRTZ_FT_J_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_JG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,JOB_GROUP);
CREATE INDEX IDX_QRTZ_FT_T_G ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_NAME,TRIGGER_GROUP);
CREATE INDEX IDX_QRTZ_FT_TG ON QRTZ_FIRED_TRIGGERS(SCHED_NAME,TRIGGER_GROUP);

运行简单和cron任务, 发现基本上关键信息都在trigger表里.

  • 几个trigger都有对应的记录, 每个trigger主要记录启动时间和下次执行时间, 每次执行后自动计算下次执行时间, 这样就实现了长期的调度跟踪. trigger运行完会自动从库表里被清除, 并没有留下记录.
  • 如果trigger没有执行完毕, 或是本身是长期的cron任务, quartz进程就关闭, 下次启动的时候会报错trigger和job已经存在, 不能重复添加到scheduler里面.
  • simple trigger的任务有repeat count等信息,但是数据库里没有找到该信息, endtime也是0, 不知道quartz是如何确定停止运行的? 在内存的话重启肯定会丢失, 可能在job-detail的job data里面?
  • quartz-triggers
SCHED_NAME   |TRIGGER_NAME|TRIGGER_GROUP|JOB_NAME|JOB_GROUP|DESCRIPTION|NEXT_FIRE_TIME|PREV_FIRE_TIME|PRIORITY|TRIGGER_STATE|TRIGGER_TYPE|START_TIME   |END_TIME|CALENDAR_NAME|MISFIRE_INSTR|JOB_DATA|
-------------|------------|-------------|--------|---------|-----------|--------------|--------------|--------|-------------|------------|-------------|--------|-------------|-------------|--------|
TestScheduler|trigger1 |group1 |job1 |group1 | | 1704356160000| -1| 5|WAITING |SIMPLE |1704356160000| 0| | 0| |
TestScheduler|trigger2 |group1 |job2 |group1 | | 1704355180000| 1704355160000| 5|WAITING |CRON |1704354261000| 0| | 0| |
TestScheduler|trigger4 |group1 |job4 |group1 | | 1704355167257| 1704355157257| 5|WAITING |SIMPLE |1704354997257| 0| | 0| |
  • quartz-jobs
SCHED_NAME   |JOB_NAME|JOB_GROUP|DESCRIPTION|JOB_CLASS_NAME                        |IS_DURABLE|IS_NONCONCURRENT|IS_UPDATE_DATA|REQUESTS_RECOVERY|JOB_DATA                                                                                                                                                                                                                                                       |
-------------|--------|---------|-----------|--------------------------------------|----------|----------------|--------------|-----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
TestScheduler|job1 |group1 | |org.quartz.examples.example1.HelloJob |0 |0 |0 |0 |¬í sr org.quartz.JobDataMap ° è¿©°Ë xr &org.quartz.utils.StringKeyDirtyFlagMap èÃûÅ]( Z allowsTransientDataxr org.quartz.utils.DirtyFlagMap æ.­(v Î Z dirtyL mapt Ljava/util/Map;xp sr java.util.HashMap ÚÁà `Ñ F loadFactorI thresholdxp?@|
TestScheduler|job2 |group1 | |org.quartz.examples.example3.SimpleJob|0 |0 |0 |0 |¬í sr org.quartz.JobDataMap ° è¿©°Ë xr &org.quartz.utils.StringKeyDirtyFlagMap èÃûÅ]( Z allowsTransientDataxr org.quartz.utils.DirtyFlagMap æ.­(v Î Z dirtyL mapt Ljava/util/Map;xp sr java.util.HashMap ÚÁà `Ñ F loadFactorI thresholdxp?@|
TestScheduler|job4 |group1 | |org.quartz.examples.example2.SimpleJob|0 |0 |0 |0 |¬í sr org.quartz.JobDataMap ° è¿©°Ë xr &org.quartz.utils.StringKeyDirtyFlagMap èÃûÅ]( Z allowsTransientDataxr org.quartz.utils.DirtyFlagMap æ.­(v Î Z dirtyL mapt Ljava/util/Map;xp sr java.util.HashMap ÚÁà `Ñ F loadFactorI thresholdxp?@|