Master、Worker是DolphinScheduler最重要的两个模块,Master负责任务的调度,Worker负责任务的执行。
任务提交过程中,使用到了众多线程,其中最重要的单线程、线程池,如图所示:
DolphinScheduler 的线程模型总的来说:
- 串联了多个生产者-消费者,队列使用内存队列
- 线程定义了独立的名字区分
- Master-Worker 之间通过 Netty 通信
1. Master
定时调度的处理入口类是class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCloseable
,这是一个单线程。
findCommands
方法查询出待调度的任务,通过command2ProcessInstance
转化为ProcessInstance
。构造WorkflowExecuteRunnable
,作为生产者传入WorkflowEventQueue
。class WorkflowEventLooper extends BaseDaemonThread
消费WorkflowEventQueue
,调用WorkflowStartEventHandler.handleWorkflowEvent
,将WorkflowExecuteRunnable::call
方法交给WorkflowExecuteThreadPool
class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor
是一个线程池,执行传入的方法。WorkflowExecuteRunnable::call
:根据 ProcessInstance 构造 DAG,即产出多个关联的 TaskInstance,提交到TaskPriorityQueue
class TaskPriorityQueueConsumer extends BaseDaemonThread
单线程消费上述队列,调用dispatchTask
方法,通过consumerThreadPoolExecutor
线程池将任务发送到各个Worker节点
2. Worker
- Worker接收Master请求的处理类是
class TaskDispatchProcessor implements NettyRequestProcessor
,处理方法process(Channel channel, Command command)
。构造WorkerDelayTaskExecuteRunnable
,写入到DelayQueue<WorkerDelayTaskExecuteRunnable>
class WorkerManagerThread implements Runnable
内部启动一个线程,消费上述队列,将任务提交到线程池class WorkerExecService
维护一个线程池,执行WorkerTaskExecuteRunnable::call
方法,该方法会根据具体Task类型执行对应任务:ShellTask/SqlTask/FlinkTask等。
以上即一个例行任务的基本流程。从工程角度看,一个系统还需要能够充分考虑各类异常情况,比如:
- Master、Worker是如何容错的,如何确保单个工作流只会调度在一个实例上?
- Worker打满时,任务发送到 Worker 是如何处理的?如何确保任务不丢?
- DolphinScheduler 系统强依赖数据库,那如何确保数据库记录与真实任务状态的一致性?
- 如何对外暴露系统当前状态?例如任务数、队列是否打满等等
诸如此类的一堆问题,是我们应用到生产环境务必需要提前考虑的。以及更详细的 Master、Worker 处理流程代码分析,在接下里的笔记中会逐步分享。
PREVIOUSDivide And Conquer-读《大脑减压的子弹笔记术》
NEXT2022年个人总结