Apache IoTDB 查询引擎目前采用 MPP 架构,一条查询 SQL 大致会经历下图几个阶段:
FragmentInstance 是分布式计划被拆分后实际分发到各个节点进行执行的实例。由于每个节点会同时接收来自于多个并发 Query 的多个 FragmentInstance,这些 FragmentInstance 在执行时可能由于等待上游数据而处于阻塞状态、或者数据就绪可以执行、或者超时需要被取消。因此,需要一个较为合理的调度策略,保证在分配给 FragmentInstance 的有限资源内,能够满足高并发的查询需求,同时尽可能避免出现饿死或者死锁情况。
在具体实现中,查询引擎里真正执行查询计算的算子树 Operator Tree 是类 Driver 的一个成员变量,Driver 负责控制Operator 的运行。DriverTask 是 Driver 的一层封装,也是调度模块真正调度的对象。一个 FragmentInstance 可能对应多个 Driver,而 Driver 与 DriverTask 是一一对应的。
本文主要介绍 Apache IoTDB 查询引擎在 DataNode 上如何调度和执行 DriverTask。相关代码位于包 org.apache.iotdb.db.mpp.execution.schedule
调度模块维护了两个队列:
同时处于 Blocked 状态的 DriverTask 会被放入集合 blockedTasks 进行记录。
总体而言,DriverTask 的调度执行参考了协程思想和操作系统任务调度机制。分配给查询引擎调度模块的线程数是固定的,可以通过配置项更改。来自于不同节点的 FragmentInstance 的 DriverTask 在 init 时会被加入 ReadyQueue。执行线程会不断拉取 ReadyQueue 队头的任务进行执行,每次只执行一个时间片,然后根据 DriverTask 的状态决定是否要将 DriverTask 重新放回 ReadyQueue。可以结合下图帮助理解:
如上图所示,目前 DriverTask 的状态包括:
上图包含了调度模块的一些重要组件,下面对调度模块重要组件进行介绍,理解这些组件的作用可以帮助您更好地阅读源码。
真正负责执行 DriverTask 的物理线程,具体实现类为 DriverSchedulerThread。数量可通过配置参数进行配置,实例启动后不可改变。
DriverSchedulerThread 的实现:
具体流程可以结合下图进行理解:
代码实现为:
public void execute(DriverTask task) throws InterruptedException {
long startNanos = ticker.read();
// try to switch it to RUNNING
if (!scheduler.readyToRunning(task)) {
return;
}
IDriver driver = task.getDriver();
CpuTimer timer = new CpuTimer();
ListenableFuture<?> future = driver.processFor(EXECUTION_TIME_SLICE);
CpuTimer.CpuDuration duration = timer.elapsedTime();
// If the future is cancelled, the task is in an error and should be thrown.
if (future.isCancelled()) {
task.setAbortCause(DriverTaskAbortedException.BY_ALREADY_BEING_CANCELLED);
scheduler.toAborted(task);
return;
}
long quantaScheduledNanos = ticker.read() - startNanos;
ExecutionContext context = new ExecutionContext();
context.setCpuDuration(duration);
context.setScheduledTimeInNanos(quantaScheduledNanos);
context.setTimeSlice(EXECUTION_TIME_SLICE);
if (driver.isFinished()) {
scheduler.runningToFinished(task, context);
return;
}
if (future.isDone()) {
scheduler.runningToReady(task, context);
} else {
scheduler.runningToBlocked(task, context);
future.addListener(
() -> {
try (SetThreadName driverTaskName2 =
new SetThreadName(task.getDriver().getDriverTaskId().getFullId())) {
scheduler.blockedToReady(task);
}
},
listeningExecutor);
}
}
}
负责监控 DriverTask 超时的物理线程,全局唯一,具体实现类为 DriverTaskTimeoutSentinelThread。
DriverTaskTimeoutSentinelThread 的实现:
可以结合下图进行理解:
目前实现参考了 Trino 的 MultilevelSplitQueue,在 IoTDB 里的实现类为 MultilevelPriorityQueue,设计思路可以参考博客 Trino 源码阅读 —— MultiLevelSplitQueue 调度机制。
该队列特点:
根据 DriverTask 的超时 deadline 排序的最大堆,超时时间越早的 DriverTask 就会被先做超时检查。
该队列长度应该有最大限制。
该队列特点:
处于 Blocked 状态的 DriverTask 的集合,线程安全,在 O(1) 的时间复杂度内完成元素的读取。
调度模块的核心,持有线程资源,即之前提到的 WorkerThread 和 SentinelThread。维护了 ReadyQueue 和 TimeoutQueue,FragmentInstance 可以通过 DriverScheduler 提交 Driver,DriverScheduler 负责将 Driver 封装成 DriverTask 并进一步执行。
DriverScheduler 负责切换 DriverTask 的状态,主要通过内部类 Scheduler 完成。ITaskScheduler 定义了切换 DriverTask 状态的接口,Scheduler 实现了这些接口。接口定义如下:
/** the scheduler interface of {@link DriverTask} */
public interface ITaskScheduler {
/**
* Switch a task from {@link DriverTaskStatus#BLOCKED} to {@link DriverTaskStatus#READY}.
*
* @param task the task to be switched.
*/
void blockedToReady(DriverTask task);
/**
* Switch a task from {@link DriverTaskStatus#READY} to {@link DriverTaskStatus#RUNNING}.
*
* @param task the task to be switched.
* @return true if it's switched to the target status successfully, otherwise false.
*/
boolean readyToRunning(DriverTask task);
/**
* Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#READY}.
*
* @param task the task to be switched.
* @param context the execution context of last running.
*/
void runningToReady(DriverTask task, ExecutionContext context);
/**
* Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#BLOCKED}.
*
* @param task the task to be switched.
* @param context the execution context of last running.
*/
void runningToBlocked(DriverTask task, ExecutionContext context);
/**
* Switch a task from {@link DriverTaskStatus#RUNNING} to {@link DriverTaskStatus#FINISHED}.
*
* @param task the task to be switched.
* @param context the execution context of last running.
*/
void runningToFinished(DriverTask task, ExecutionContext context);
/**
* Switch a task to {@link DriverTaskStatus#ABORTED}.
*
* @param task the task to be switched.
*/
void toAborted(DriverTask task);
总体流程可以参考下图:
红色三角处表示,当获取到锁之后,还需要再次确认 DriverTask 状态是否符合预期(在排队等锁时可能被 SentinelThread 改为 Aborted 状态)。若为 Aborted 状态,则后续流程全部跳过。
代码实现为:
@Override
public void blockedToReady(DriverTask task) {
task.lock();
try {
if (task.getStatus() != DriverTaskStatus.BLOCKED) {
return;
}
task.setStatus(DriverTaskStatus.READY);
QUERY_METRICS.recordTaskQueueTime(
BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime());
task.setLastEnterReadyQueueTime(System.nanoTime());
task.resetLevelScheduledTime();
readyQueue.push(task);
blockedTasks.remove(task);
} finally {
task.unlock();
}
}
计算并更新调度权重,将 DriverTask 加入到 ReadyQueue。
@Override
public void runningToReady(DriverTask task, ExecutionContext context) {
task.lock();
try {
if (task.getStatus() != DriverTaskStatus.RUNNING) {
return;
}
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.READY);
task.setLastEnterReadyQueueTime(System.nanoTime());
readyQueue.push(task);
} finally {
task.unlock();
}
}
更新调度权重,然后将 DriverTask 加入 blockedTasks。
@Override
public void runningToBlocked(DriverTask task, ExecutionContext context) {
task.lock();
try {
if (task.getStatus() != DriverTaskStatus.RUNNING) {
return;
}
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.BLOCKED);
task.setLastEnterBlockQueueTime(System.nanoTime());
blockedTasks.add(task);
} finally {
task.unlock();
}
}
更新调度权重,清理 DriverTask 相关信息。
@Override
public void runningToFinished(DriverTask task, ExecutionContext context) {
task.lock();
try {
if (task.getStatus() != DriverTaskStatus.RUNNING) {
return;
}
task.updateSchedulePriority(context);
task.setStatus(DriverTaskStatus.FINISHED);
clearDriverTask(task);
} finally {
task.unlock();
}
}
由于同一个 FragmentInstance 的 DriverTask 之间有依赖性,一个 DriverTask 被置为 Aborted,其余相关的 DriverTask 也应该被置为 Aborted。
@Override
public void toAborted(DriverTask task) {
try (SetThreadName driverTaskName =
new SetThreadName(task.getDriver().getDriverTaskId().getFullId())) {
task.lock();
try {
// If a task is already in an end state, it indicates that the task is finalized in other
// threads.
if (task.isEndState()) {
return;
}
logger.warn(
"The task {} is aborted. All other tasks in the same query will be cancelled",
task.getDriverTaskId());
clearDriverTask(task);
} finally {
task.unlock();
}
QueryId queryId = task.getDriverTaskId().getQueryId();
Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId);
if (queryRelatedTasks != null) {
for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
if (fragmentRelatedTasks != null) {
for (DriverTask otherTask : fragmentRelatedTasks) {
if (task.equals(otherTask)) {
continue;
}
otherTask.lock();
try {
otherTask.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
clearDriverTask(otherTask);
} finally {
otherTask.unlock();
}
}
}
}
}
}
}