Apache DolphinScheduler-1.3.9源码分析(一)

引言

随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。

在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,介绍 Master 启动以及调度流程。

通过这些分析,开发者可以更好地理解 DolphinScheduler 的工作机制,并在实际使用中更高效地进行二次开发或优化。

Master Server启动

启动流程图

Master调度工作流流程图

MasterServer启动方法

public void run() {
    // init remoting server
    NettyServerConfig serverConfig = new NettyServerConfig();
    serverConfig.setListenPort(masterConfig.getListenPort());
    this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
    this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
    this.nettyRemotingServer.start();

    // self tolerant
    this.zkMasterClient.start();
    this.zkMasterClient.setStoppable(this);

    // scheduler start
    this.masterSchedulerService.start();

    // start QuartzExecutors
    // what system should do if exception
    try {
        logger.info("start Quartz server...");
        QuartzExecutors.getInstance().start();
    } catch (Exception e) {
        try {
            QuartzExecutors.getInstance().shutdown();
        } catch (SchedulerException e1) {
            logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
        }
        logger.error("start Quartz failed", e);
    }

    /**
     * register hooks, which are called before the process exits
     */
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        if (Stopper.isRunning()) {
            close("shutdownHook");
        }
    }));

}
  • nettyServer会注册三种Command
  1. TASK_EXECUTE_ACK:Worker在接收到Master执行任务的请求后,会给Master发送一条Ack Command,告诉Master已经开始执行Task了。
  2. TASK_EXECUTE_RESPONSE:Worker在执行完Task之后,会给Master发送一条Response Command,告诉Master任务调度/执行结果。
  3. TASK_KILL_RESPONSE:Master接收到Task停止的请求会,会给Worker发送TASK_KILL_REQUEST Command,之后Worker会把Task_KILL_RESPONSE Command返回给Master。
  • 启动调度和定时器。
  • 添加ShutdownHook,关闭资源。

Master 配置文件

master.listen.port=5678

# 限制Process Instance并发调度的线程数
master.exec.threads=100

# 限制每个ProcessInstance可以执行的任务数
master.exec.task.num=20

# 每一批次可以分发的任务数
master.dispatch.task.num=3

# master需要选择一个稳定的worker去执行任务
# 算法有:Random,RoundRobin,LowerWeight。默认是LowerWeight
master.host.selector=LowerWeight

# master需要向Zookeeper发送心跳,单位:秒
master.heartbeat.interval=10

# master提交任务失败,重试次数
master.task.commit.retryTimes=5

# master提交任务失败,重试时间间隔
master.task.commit.interval=1000

# master最大cpu平均负载,只有当系统cpu平均负载还没有达到这个值,master才能调度任务
# 默认值为-1,系统cpu核数 * 2
master.max.cpuload.avg=-1

# master为其他进程保留内存,只有当系统可用内存大于这个值,master才能调度
# 默认值0.3G
master.reserved.memory=0.3

Master Scheduler启动

MasterSchedulerService初始化方法

public void init(){
    // masterConfig.getMasterExecThreads(),master.properties里master.exec.threads=100
    // 该线程池的核心线程数和最大线程数为100
    this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
    NettyClientConfig clientConfig = new NettyClientConfig();
    this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}

MasterSchedulerService启动方法

public void run() {
    logger.info("master scheduler started");
    while (Stopper.isRunning()){
        try {
            // 这个方法是用来检查master cpu load和memory,判断master是否还有资源进行调度
            // 如果不能调度,Sleep 1 秒种
            boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());
            if(!runCheckFlag) {
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                continue;
            }
            if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
                // 这里才是真正去执行调度的方法
                scheduleProcess();
            }
        } catch (Exception e) {
            logger.error("master scheduler thread error", e);
        }
    }
}

MasterSchedulerService调度方法

private void scheduleProcess() throws Exception {
    InterProcessMutex mutex = null;
    try {
        // 阻塞式获取分布式锁
        mutex = zkMasterClient.blockAcquireMutex();
        // 获取线程池的活跃线程数
        int activeCount = masterExecService.getActiveCount();
        // make sure to scan and delete command  table in one transaction
        // 获取其中一个command,必须保证操作都在一个事务里
        Command command = processService.findOneCommand();
        if (command != null) {
            logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());

            try{
                // 获取ProcessInstance,
                // 这个方法会根据master.exec.threads配置和活跃线程数来判断是否可以调度processInstance
                ProcessInstance processInstance = processService.handleCommand(logger,
                        getLocalAddress(),
                        this.masterConfig.getMasterExecThreads() - activeCount, command);
                if (processInstance != null) {
                    logger.info("start master exec thread , split DAG ...");
                    masterExecService.execute(
                    new MasterExecThread(
                            processInstance
                            , processService
                            , nettyRemotingClient
                            ));
                }
            }catch (Exception e){
                logger.error("scan command error ", e);
                processService.moveToErrorCommand(command, e.toString());
            }
        } else{
            //indicate that no command ,sleep for 1s
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        }
    } finally{
        // 释放分布式锁
        zkMasterClient.releaseMutex(mutex);
    }
}

ProcessService处理Command的方法

public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {
    // 这里是去构造ProcessInstance
    ProcessInstance processInstance = constructProcessInstance(command, host);
    //cannot construct process instance, return null;
    if(processInstance == null){
        logger.error("scan command, command parameter is error: {}", command);
        moveToErrorCommand(command, "process instance is null");
        return null;
    }
    // 这里是检测当前剩余线程数是否大于等于该ProcessDefinition及其所有子Process的数量
    // 如果检测不通过,process instance的状态变为wait thread.并且返回空的process instance
    if(!checkThreadNum(command, validThreadNum)){
        logger.info("there is not enough thread for this command: {}", command);
        return setWaitingThreadProcess(command, processInstance);
    }
    processInstance.setCommandType(command.getCommandType());
    processInstance.addHistoryCmd(command.getCommandType());
    saveProcessInstance(processInstance);
    this.setSubProcessParam(processInstance);
    delCommandByid(command.getId());
    return processInstance;
}

MasterExecThread初始化方法

public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){
    this.processService = processService;

    this.processInstance = processInstance;
    this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
    // master.properties文件里的master.task.exec.num
    int masterTaskExecNum = masterConfig.getMasterExecTaskNum();
    this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",
                                                                    masterTaskExecNum);
    this.nettyRemotingClient = nettyRemotingClient;
}

MasterExecThread启动方法

public void run() {
    // 省略...
    try {
        if (processInstance.isComplementData() &&  Flag.NO == processInstance.getIsSubProcess()){
            // 补数逻辑... 暂不看
            executeComplementProcess();
        }else{
            // 执行task方法
            executeProcess();
        }
    }catch (Exception e){
        logger.error("master exec thread exception", e);
        logger.error("process execute failed, process id:{}", processInstance.getId());
        processInstance.setState(ExecutionStatus.FAILURE);
        processInstance.setEndTime(new Date());
        processService.updateProcessInstance(processInstance);
    }finally {
        taskExecService.shutdown();
    }
}


private void executeProcess() throws Exception {
    // 前置
    prepareProcess();
    // 执行
    runProcess();
    // 后置
    endProcess();
}

private void runProcess(){
    // 从根task开始提交
    submitPostNode(null);
    boolean sendTimeWarning = false;
    while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){
        // 省略部分代码...

        // 根据cpu load avg和Memorry判断是否可以调度
        if(canSubmitTaskToQueue()){
            submitStandByTask();
        }
        try {
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        } catch (InterruptedException e) {
            logger.error(e.getMessage(),e);
        }
        updateProcessInstanceState();
    }

    logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}

// 获取可以并行的task
/**
* task 1 -> task 2 -> task3
* task 4 -> task 5
* task 6
* task 1,task4,task6可以并行跑
*/
private void submitPostNode(String parentNodeName){
    Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);
    List<TaskInstance> taskInstances = new ArrayList<>();
    for(String taskNode : submitTaskNodeList){
        taskInstances.add(createTaskInstance(processInstance, taskNode,
                dag.getNode(taskNode)));
    }
    // if previous node success , post node submit
    for(TaskInstance task : taskInstances){
        if(readyToSubmitTaskQueue.contains(task)){
            continue;
        }
        if(completeTaskList.containsKey(task.getName())){
            logger.info("task {} has already run success", task.getName());
            continue;
        }
        if(task.getState().typeIsPause() || task.getState().typeIsCancel()){
            logger.info("task {} stopped, the state is {}", task.getName(), task.getState());
        }else{
            // task添加到priorityQueue
            addTaskToStandByList(task);
        }
    }
}

/**
 * handling the list of tasks to be submitted
 */
private void submitStandByTask(){

    try {
        int length = readyToSubmitTaskQueue.size();
        for (int i=0;i<length;i++) {
            // 从队列里面取task, 提交给worker执行
            TaskInstance task = readyToSubmitTaskQueue.peek();
            // 先判断task的前置依赖有没有都运行成功,如果运行成功,在提交该task运行

            // 如果运行失败,或者没有执行,则不提交
            DependResult dependResult = getDependResultForTask(task);
            if(DependResult.SUCCESS == dependResult){
                if(retryTaskIntervalOverTime(task)){
                    submitTaskExec(task);
                    removeTaskFromStandbyList(task);
                }
            }else if(DependResult.FAILED == dependResult){
                // if the dependency fails, the current node is not submitted and the state changes to failure.
                dependFailedTask.put(task.getName(), task);
                removeTaskFromStandbyList(task);
                logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);
            } else if (DependResult.NON_EXEC == dependResult) {
            // for some reasons(depend task pause/stop) this task would not be submit
            removeTaskFromStandbyList(task);
            logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);
           }
        }
    } catch (Exception e) {
        logger.error("submit standby task error",e);
    }
}

/**
* 创建TaskExecThread
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {
    MasterBaseTaskExecThread abstractExecThread = null;
    if(taskInstance.isSubProcess()){
        abstractExecThread = new SubProcessTaskExecThread(taskInstance);
    }else if(taskInstance.isDependTask()){
        abstractExecThread = new DependentTaskExecThread(taskInstance);
    }else if(taskInstance.isConditionsTask()){
        abstractExecThread = new ConditionsTaskExecThread(taskInstance);
    }else {
        abstractExecThread = new MasterTaskExecThread(taskInstance);
    }
    Future<Boolean> future = taskExecService.submit(abstractExecThread);
    activeTaskNode.putIfAbsent(abstractExecThread, future);
    return abstractExecThread.getTaskInstance();
}

MasterBaseTaskExecThread

MasterBaseTaskExecThreadSubProcessTaskExecThreadDependentTaskExecThreadConditionsTaskExecThreadMasterTaskExecThread的父类,实现Callable接口。

  • SubProcessTaskExecThread

    任务实例不会下发到worker节点执行,在submitTask(TaskInstance taskInstance)方法中,针对子流程,会增加一条子流程实例命令,然后在waitTaskQuit方法中循环等待子流程执行完成。在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。

  • DependentTaskExecThread

    Dependent 节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。

  • ConditionsTaskExecThrea

    Conditions 是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。截止目前 Conditions 支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖。

  • MasterTaskExecThread

    将任务实例下发到worker节点执行,并在waitTaskQuit方法中循环等待任务实例执行完成,任务完成后则即出。例如SQKL,Shell等任务类型。

MasterBaseTaskExecThread初始化方法

public MasterBaseTaskExecThread(TaskInstance taskInstance){
    this.processService = SpringApplicationContext.getBean(ProcessService.class);
    this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
    this.cancel = false;
    this.taskInstance = taskInstance;
    this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
    this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);
    initTaskParams();
}

MasterBaseTaskExecThread执行方法

@Override
public Boolean call() throws Exception {
    this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());
    return submitWaitComplete(); // 由各子类实现
}

MasterBaseTaskExecThread公共方法

submit()

protected TaskInstance submit(){
    // 提交任务重试次数. master.task.commit.retryTimes=5
    Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
    // 提交任务失败,重试间隔时间 master.task.commit.interval=1000
    Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();

    int retryTimes = 1;
    boolean submitDB = false;
    boolean submitTask = false;
    TaskInstance task = null;
    while (retryTimes <= commitRetryTimes){
        try {
            if(!submitDB){
                // 持久化TaskInstance到数据库
                task = processService.submitTask(taskInstance);
                if(task != null && task.getId() != 0){
                    submitDB = true;
                }
            }
            if(submitDB && !submitTask){
                // 分发任务到Woroker执行
                submitTask = dispatchTask(task);
            }
            if(submitDB && submitTask){
                return task;
            }
            if(!submitDB){
                logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
            }else if(!submitTask){
                logger.error("task commit  failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);
            }
            Thread.sleep(commitRetryInterval);
        } catch (Exception e) {
            logger.error("task commit to mysql and dispatcht task failed",e);
        }
        retryTimes += 1;
    }
    return task;
}

dispatchTask(TaskInstance task)

public Boolean dispatchTask(TaskInstance taskInstance) {
    try{
        // 如果是子流程,条件任务,依赖任务,直接返回true,不提交给worker执行
        if(taskInstance.isConditionsTask()
                || taskInstance.isDependTask()
                || taskInstance.isSubProcess()){
            return true;
        }
        if(taskInstance.getState().typeIsFinished()){
            logger.info(String.format("submit task , but task [%s] state [%s] is already  finished. ", taskInstance.getName(), taskInstance.getState().toString()));
            return true;
        }
        // task cannot submit when running
        if(taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION){
            logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));
            return true;
        }
        logger.info("task ready to submit: {}", taskInstance);

        /**
         *  taskPriority
         */
        TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),
                                                      processInstance.getId(),
                                                      taskInstance.getProcessInstancePriority().getCode(),
                                                      taskInstance.getId(),
                                                      org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
        // 放入TaskPriorityQueue中,
        // org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl用于消费,从队列里取出TaskInstance,提交给Worker执行
        taskUpdateQueue.put(taskPriority);
        logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
        return true;
    }catch (Exception e){
        logger.error("submit task  Exception: ", e);
        logger.error("task error : %s", JSONUtils.toJson(taskInstance));
        return false;
    }
}

MasterTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {
    Boolean result = false;
    // 提交任务
    this.taskInstance = submit();
    if(this.taskInstance == null){
        logger.error("submit task instance to mysql and queue failed , please check and fix it");
        return result;
    }
    if(!this.taskInstance.getState().typeIsFinished()) {
        // 等待任务执行结果
        result = waitTaskQuit();
    }
    taskInstance.setEndTime(new Date());
    processService.updateTaskInstance(taskInstance);
    logger.info("task :{} id:{}, process id:{}, exec thread completed ",
                this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
    return result;
}

waitTaskQuit()

public Boolean waitTaskQuit(){
    // query new state
    taskInstance = processService.findTaskInstanceById(taskInstance.getId());
    logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
                this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());

    while (Stopper.isRunning()){
        try {
            if(this.processInstance == null){
                logger.error("process instance not exists , master task exec thread exit");
                return true;
            }
            // task instance add queue , waiting worker to kill
            if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
                cancelTaskInstance();
            }
            if(processInstance.getState() == ExecutionStatus.READY_PAUSE){
                pauseTask();
            }
            // task instance finished
            if (taskInstance.getState().typeIsFinished()){
                // if task is final result , then remove taskInstance from cache
                // taskInstanceCacheManager其实现类为:org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl
                // taskInstance在触发ack和response Command会被添加到taskInstanceCache里
                taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
                break;
            }
            if (checkTaskTimeout()) {
                this.checkTimeoutFlag = !alertTimeout();
            }
            // updateProcessInstance task instance
            taskInstance = processService.findTaskInstanceById(taskInstance.getId());
            processInstance = processService.findProcessInstanceById(processInstance.getId());
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        } catch (Exception e) {
            logger.error("exception",e);
            if (processInstance != null) {
                logger.error("wait task quit failed, instance id:{}, task id:{}",
                             processInstance.getId(), taskInstance.getId());
            }
        }
    }
    return true;
}

SubProcessTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {

    Boolean result = false;
    try{
        // submit task instance
        this.taskInstance = submit();

        if(taskInstance == null){
            logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it");
            return result;
        }
        setTaskInstanceState();
        waitTaskQuit();
        subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());

        // at the end of the subflow , the task state is changed to the subflow state
        if(subProcessInstance != null){
            if(subProcessInstance.getState() == ExecutionStatus.STOP){
                this.taskInstance.setState(ExecutionStatus.KILL);
            }else{
                this.taskInstance.setState(subProcessInstance.getState());
            }
        }
        taskInstance.setEndTime(new Date());
        processService.updateTaskInstance(taskInstance);
        logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ",
                    this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
        result = true;

    }catch (Exception e){
        logger.error("exception: ",e);
        if (null != taskInstance) {
            logger.error("wait task quit failed, instance id:{}, task id:{}",
                         processInstance.getId(), taskInstance.getId());
        }
    }
    return result;
}

waitTaskQuit()

private void waitTaskQuit() throws InterruptedException {

    logger.info("wait sub work flow: {} complete", this.taskInstance.getName());

    if (taskInstance.getState().typeIsFinished()) {
        logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}",
                    this.taskInstance.getName(),
                    this.taskInstance.getState(),
                    this.processInstance.getState());
        return;
    }
    while (Stopper.isRunning()) {
        // waiting for subflow process instance establishment
        if (subProcessInstance == null) {
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
            if(!setTaskInstanceState()){
                continue;
            }
        }
        subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());
        if (checkTaskTimeout()) {
            this.checkTimeoutFlag = !alertTimeout();
            handleTimeoutFailed();
        }
        updateParentProcessState();
        if (subProcessInstance.getState().typeIsFinished()){
            break;
        }
        if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){
            // parent process "ready to pause" , child process "pause"
            pauseSubProcess();
        }else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
            // parent Process "Ready to Cancel" , subflow "Cancel"
            stopSubProcess();
        }
        Thread.sleep(Constants.SLEEP_TIME_MILLIS);
    }
}

ConditionsTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {
    try{
        this.taskInstance = submit();
        logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                                                                 taskInstance.getProcessDefinitionId(),
                                                                 taskInstance.getProcessInstanceId(),
                                                                 taskInstance.getId()));
        String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
        Thread.currentThread().setName(threadLoggerInfoName);
        initTaskParameters();
        logger.info("dependent task start");
        // 等待判断
        waitTaskQuit();
        // 更新最终依赖结果
        updateTaskState();
    }catch (Exception e){
        logger.error("conditions task run exception" , e);
    }
    return true;
}

waitTaskQuit

private void waitTaskQuit() {
    List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
            taskInstance.getProcessInstanceId()
    );
    for(TaskInstance task : taskInstances){
        completeTaskList.putIfAbsent(task.getName(), task.getState());
    }

    // 获取所有依赖结果
    List<DependResult> modelResultList = new ArrayList<>();
    for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){

        List<DependResult> itemDependResult = new ArrayList<>();
        for(DependentItem item : dependentTaskModel.getDependItemList()){
            itemDependResult.add(getDependResultForItem(item));
        }
        DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
        modelResultList.add(modelResult);
    }
    // 根据逻辑运算符,合并依赖结果
    conditionResult = DependentUtils.getDependResultForRelation(
            dependentParameters.getRelation(), modelResultList
    );
    logger.info("the conditions task depend result : {}", conditionResult);
}

DependentTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {
    try{
        logger.info("dependent task start");
        this.taskInstance = submit();
        logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
                taskInstance.getProcessDefinitionId(),
                taskInstance.getProcessInstanceId(),
                taskInstance.getId()));
        String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
        Thread.currentThread().setName(threadLoggerInfoName);
        initTaskParameters();
        initDependParameters();
        waitTaskQuit();
        updateTaskState();
    }catch (Exception e){
        logger.error("dependent task run exception" , e);
    }
    return true;
}

waitTaskQuit()

private Boolean waitTaskQuit() {
    logger.info("wait depend task : {} complete", this.taskInstance.getName());
    if (taskInstance.getState().typeIsFinished()) {
        logger.info("task {} already complete. task state:{}",
                this.taskInstance.getName(),
                this.taskInstance.getState());
        return true;
    }
    while (Stopper.isRunning()) {
        try{
            if(this.processInstance == null){
                logger.error("process instance not exists , master task exec thread exit");
                return true;
            }
            // 省略部分代码

            // allDependentTaskFinish()等待所有依赖任务执行结束
            if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){
                break;
            }
            // update process task
            taskInstance = processService.findTaskInstanceById(taskInstance.getId());
            processInstance = processService.findProcessInstanceById(processInstance.getId());
            Thread.sleep(Constants.SLEEP_TIME_MILLIS);
        } catch (Exception e) {
            logger.error("exception",e);
            if (processInstance != null) {
                logger.error("wait task quit failed, instance id:{}, task id:{}",
                        processInstance.getId(), taskInstance.getId());
            }
        }
    }
    return true;
}

TaskPriorityQueueConsumer

@Override
public void run() {
    List<TaskPriority> failedDispatchTasks = new ArrayList<>();
    while (Stopper.isRunning()){
        try {
            // 每一批次分发任务数量,master.dispatch.task.num = 3
            int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
            failedDispatchTasks.clear();
            for(int i = 0; i < fetchTaskNum; i++){
                if(taskPriorityQueue.size() <= 0){
                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                    continue;
                }
                // if not task , blocking here
                // 从队列里面获取task
                TaskPriority taskPriority = taskPriorityQueue.take();
                // 分发给worker执行
                boolean dispatchResult = dispatch(taskPriority);
                if(!dispatchResult){
                    failedDispatchTasks.add(taskPriority);
                }
            }
            if (!failedDispatchTasks.isEmpty()) {
                // 分发失败的任务,需要重新加入队列中,等待重新分发
                for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
                    taskPriorityQueue.put(dispatchFailedTask);
                }
                // If there are tasks in a cycle that cannot find the worker group,
                // sleep for 1 second
                if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
                    TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
                }
            }
        }catch (Exception e){
            logger.error("dispatcher task error",e);
        }
    }
}


/**
 * dispatch task
 *
 * @param taskPriority taskPriority
 * @return result
 */
protected boolean dispatch(TaskPriority taskPriority) {
    boolean result = false;
    try {
        int taskInstanceId = taskPriority.getTaskId();
        TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);
        ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());

        if (taskInstanceIsFinalState(taskInstanceId)){
            // when task finish, ignore this task, there is no need to dispatch anymore
            return true;
        }else{
            // 分发任务
            // 分发算法支持:低负载优先算法,随机算法, 轮询算法。
            result = dispatcher.dispatch(executionContext);
        }
    } catch (ExecuteException e) {
        logger.error("dispatch error: {}",e.getMessage());
    }
    return result;
}

通过对 Apache DolphinScheduler 1.3.9 的源码分析,我们深入了解了其核心模块的设计和实现。DolphinScheduler 的 Master 架构充分保证了任务调度的高可用性和扩展性,而通过 Zookeeper 实现的集群协调则为系统提供了强大的容错机制。

如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。

本文完!

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/885250.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

低空经济时代:无人机飞行安全要点详解

随着低空经济的蓬勃发展&#xff0c;无人机&#xff08;UAV&#xff09;在农业、航拍、物流、应急救援等多个领域的应用日益广泛。然而&#xff0c;无人机的安全飞行不仅关乎任务的成功与否&#xff0c;更直接关系到地面人员、财产及空中交通的安全。本文将从飞行前检查、环境评…

react crash course 2024(7) react router dom

安装 npm i react-router-dom 引入 import {Route,createBrowserRouter,createRoutesFromElements,RouterProvider} from react-router-dom 在app.jsx const router createBrowserRouter(createRoutesFromElements(<Route index element {<h1>My App</h1>…

Centos 8安装VNC及多用户配置详细教程

Centos 8安装VNC及多用户配置详细教程 参考一、安装前准备二、安装三、创建新用户和设置VNC密码四、创建VNC系统服务文件五、多用户映射和配置VNC六、客户端用VNC Viewer登录 参考 1、参考1&#xff1a; VNC安装英文说明&#xff08;英文说明有误且仅适合单用户&#xff09;&a…

tauri中加载本地文件图片或者下载网络文件图片后存储到本地,然后通过前端页面展示

有一个需求是需要将本地上传的文件或者网络下载的文件存储到本地&#xff0c;并展示在前端页面上的。其实如果只是加载本地文件&#xff0c;然后展示还是挺简单的&#xff0c;可以看我的文章&#xff1a;tauri程序加载本地图片或者文件在前端页面展示-CSDN博客 要想实现上述需…

[OpenGL]使用OpenGL加载obj模型、绘制纹理

一、简介 本文介绍了如何使用OpenGL加载obj模型&#xff0c;绘制简单纹理。 在加载obj模型时&#xff0c;使用glm库确定对顶点进行坐标变换的MVP(model, view, projection)矩阵。 在绘制纹理时&#xff0c;使用stb_image.h头文件加载纹理图片&#xff0c;生成纹理。 按照本文…

108.WEB渗透测试-信息收集-FOFA语法(8)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;107.WEB渗透测试-信息收集-FOFA语法&#xff08;7&#xff09; 指挥系统的后台&#xff…

排序题目:重新排列后的最大子矩阵

文章目录 题目标题和出处难度题目描述要求示例数据范围 解法思路和算法代码复杂度分析 题目 标题和出处 标题&#xff1a;重新排列后的最大子矩阵 出处&#xff1a;1727. 重新排列后的最大子矩阵 难度 7 级 题目描述 要求 给定一个大小为 m n \texttt{m} \times \tex…

可以白嫖PPT模板的6个网站,赶紧收藏

推荐6个PPT模板网站&#xff0c;免费下载&#xff0c;绝对的高质量&#xff0c;赶紧收藏&#xff01; 1、菜鸟图库 ppt模板免费下载|ppt背景图片 - 菜鸟图库 菜鸟图库网有非常丰富的免费素材&#xff0c;像设计类、办公类、自媒体类等素材都很丰富。PPT模板种类很多&#xff0…

Python和MATLAB库尔巴克–莱布勒散度信息论统计学生物学和算法模型

&#x1f3af;要点 高斯混合模型聚类和t分布随机邻域嵌入底层分析信息论测量复合彩票统计学计算结果离散分布速率最优估计器样本统计相似性快速闭环散度和交叉熵计算催乳素诱导模型贝叶斯快速推理模型视觉皮层活动神经数据分布 Python散度 在数理统计中&#xff0c;库尔巴克…

Gson将对象转换为JSON(学习笔记)

JSON有两种表示结构&#xff0c;对象和数组。对象结构以"{"大括号开始&#xff0c;以"}"大括号结束。中间部分由0或多个以”&#xff0c;"分隔的”key(关键字)/value(值)"对构成&#xff0c;关键字和值之间以":"分隔&#xff0c;语法结…

PHP程序如何实现限制一台电脑登录?

PHP程序如何实现限制一台电脑登录&#xff1f; 可以使用以下几种方法&#xff1a; 1. IP地址限制&#xff1a;在PHP中&#xff0c;可以通过获取客户端的IP地址&#xff0c;然后与允许登录的IP地址列表进行比对。如果客户端的IP地址不在列表中&#xff0c;就禁止登录。 “php $…

HTTP 1.0 2.0 3.0详解

HTTP HTTP全称超文本传输协议&#xff0c;是一种属于应用层的通信协议。它允许将超文本标记语言文档&#xff08;HTML&#xff09;从Web服务器传输到客户端的浏览器。 HTTP报文结构 请求报文结构 请求方法&#xff1a; GET&#xff1a;一般用来请求已被URI识别的资源&#x…

永不失联!遨游双卫星三防手机成为高效应急关键所在

今年9月被戏称为“台风月”&#xff0c;台风“摩羯”、“贝碧嘉”以及热带气旋“普拉桑”接连来袭&#xff0c;极端天气不仅导致了电力中断、道路损毁&#xff0c;更使得传统的通信网络遭受重创&#xff0c;给应急通信保障工作带来了极大的压力。面对“三断”的实战难题&#x…

从文本图片到多模态:3D 数字人打开企业全域商业增长新空间

摘要&#xff1a;数字化与AI浪潮推动各行业变革&#xff0c;内容形式也发生巨变&#xff0c;从文本到多媒体的多模态表达&#xff0c;标志着内容创造走向升维。AIGC 3D生成技术的突飞猛进&#xff0c;彻底打破了传统3D内容生产门槛高、周期长、成本高昂的问题。将3D数字人的打造…

数学建模研赛总结

目录 前言进度问题四分析问题五分析数模论文经验分享总结 前言 本文为博主数学建模比赛第五天的内容记录&#xff0c;希望所写的一些内容能够对大家有所帮助&#xff0c;不足之处欢迎大家批评指正&#x1f91d;&#x1f91d;&#x1f91d; 进度 今天已经是最后一天了&#xf…

nginx常用的性能优化

第一步调整工作进程数&#xff1a; 设置成auto&#xff0c;会自动按照CPU核心数来启动工作进程数&#xff0c;如果设置具体数字&#xff0c;则只会使用指定数量的CPU核心&#xff0c;无法将CPU同一时间都能用得到&#xff0c;所以就不能发挥服务器的最大的性能。 第二步增加进程…

el-table添加fixed后错位问题

1 方案1 return {isShow:false, }mounted() {this.isShowtrue},watch: {$route(newRoute) {this.monitoredRoute newRoute; // 将新的路由信息保存到组件的monitoredRoute属性中// 执行其他操作或调用其他方法},//或$route(newRoute) {this.monitoredRoute newRoute; // 将新…

Java项目实战II基于Java+Spring Boot+MySQL的免税商品优选购物商城(源码+数据库+文档)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者 一、前言 随着全球贸易的日益繁荣和消费者需求的多样化&#xff0c;免税商品购物已成为众多旅行者和消费者的热…

浸没式密封连接器

在当今科技快速发展的背景下&#xff0c;电子设备的整合度与性能需求持续提高&#xff0c;而连接技术作为电子设备间交互的关键&#xff0c;其重要性显而易见。在各式各样的连接技术当中&#xff0c;浸没式密封连接器凭借其独到设计和高超性能&#xff0c;在特定使用环境中显示…

SpringBoot3中ymal配置文件(持续更新)

博客主页&#xff1a;音符犹如代码系列专栏&#xff1a;JavaWeb关注博主&#xff0c;后期持续更新系列文章如果有错误感谢请大家批评指出&#xff0c;及时修改感谢大家点赞&#x1f44d;收藏⭐评论✍ 在SpringBoot项目中,使用application.properties进行配置管理时&#xff0c;…