DolphinScheduler笔记之2: 日志

 

1. 日志的作用

对架构RD而言,对模块的了解不应当仅仅局限于正常流程本身,而是能够分析各种异常场景,提前添加对应的日志和Metrics等信息。例如日志的格式、位置,日志级别能否Change on-the-fly等,目的是能够通过日志分析流量、追查问题。

日志有两种:

  1. write-ahead logging,例如leveldb笔记之2:日志
  2. diagnostic logs,例如glog源码解析一:整体结构

这篇笔记想要谈的是第2种。

任务调度系统除了模块自身的日志,还需要打印不同任务的日志,日志互不影响

举个例子,ds-worker 模块日志有两个appender: WORKERLOGFILE、TASKLOGFILE.
WORKERLOGFILE 负责输出模块日志,TASKLOGFILE 负责输出每个 task 的日志。

2. 模块日志

模块日志最常见的想法是 logid、rpcid、traceid 等,无外乎以下两个目的:

  1. 通过唯一id串联模块内部的处理流程:多个方法、线程间传递
  2. 通过唯一id串联系统整体的处理流程:多个模块、服务间的调用关系

例如阿里中台里提到的鹰眼系统。

2.1. Mapped Diagnostic Context

任务调度系统需要调度、执行不同的任务,因此需要考虑按照任务id串联所有日志,而在调度时为了确保任务互不影响,单个任务会在单独的线程、进程里处理。

MDC1非常适合这个背景:

Most real-world distributed systems need to deal with multiple clients simultaneously. In a typical multithreaded implementation of such a system, different threads will handle different clients. A possible but slightly discouraged approach to differentiate the logging output of one client from another consists of instantiating a new and separate logger for each client. This technique promotes the proliferation of loggers and may increase their management overhead.

简言之:MDC 支持指定 log 的 context,N 条 log 语句之间可以共用该 context.

public class MDC
	static MDCAdapter mdcAdapter;

	static void put(String key, String val) {
		mdcAdapter.put(key, val);
	}

	staitc void remove(String key) {
		mdcAdapter.remove(key);
	}

MDCAdapter是一个接口,各个日志库有自己的实现,基本上都是基于 thread-local 的变量,例如

class LogbackMDCAdapter implements MDCAdapter
	final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal<Map<String, String>>();

因此原理上就比较清楚了,线程内设置变量,配置日志格式,需要时从日志里取出来打印设置的字段值。

2.2. 应用

DS的线程模型里介绍过,任务的执行封装在WorkerTaskExecuteRunnable::run.

在该方法里,任务执行前会调用 MDC set processInstanceId、taskInstanceId,执行完成后remove。

abstract class WorkerTaskExecuteRunnable implements Runnable
	public void run()
		try 
			LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
                    taskExecutionContext.getTaskInstanceId());
			exeucteTask(...);
		...
		finally
			LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();

在日志 pattern 里配置[WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}],就可以方便的查找指定 process task 的日志了。

例如:

[INFO] 2023-03-19 20:17:22.377 +0800 TaskLogLogger-class org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable:[261] - [WorkflowInstance-1][TaskInstance-2] - Send task execute result to master,... 

3. 任务日志

模块日志输出模块级别的信息,例如任务的调度流程、任务的返回结果、是否重试等。任务日志输出任务级别的信息,主要是任务自身的执行过程。

因此,每个任务需要写入单独的日志文件,任务结束时关闭文件,方便查看。

3.1. SiftingAppender

不同于我们常用的RollingFileAppenderSiftingAppender的作用在于分发。解析日志Event,将日志分发到不同的真正负责输出日志的 Appender.

举个例子,日志配置如下:

    <appender name="SIFTLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
        <!--discriminator鉴别器,根据taskId这个key对应的value鉴别日志事件,然后委托给具体appender写日志-->
        <!-- in the absence of the class attribute, it is assumed that the
         desired discriminator type is
         ch.qos.logback.classic.sift.MDCBasedDiscriminator -->
        <discriminator>
            <key>taskId</key>
            <defaultValue>defaultTaskId</defaultValue>
        </discriminator>
        <sift>
            <!--具体的写日志appender,每一个taskId创建一个文件-->
            <appender name="File-${taskId}" class="ch.qos.logback.core.FileAppender">
                <file>/tmp/dolphinscheduler/taskLogs/${taskId}</file>
                <append>true</append>
                <encoder charset="UTF-8">
                    <pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %-5level %logger{35} %marker - %msg%n</pattern>
                </encoder>
            </appender>
        </sift>
    </appender>

代码里打印日志前后,使用 MDC 写入taskId的值=12315

    MDC.put("taskId", "12315");
    logger.info("this is a log IN MDC.");
    MDC.remove("taskId");

日志就输出到单独的日志文件/tmp/dolphinscheduler/taskLogs/12315了。

之所以能够达到这样的效果,是因为如果不指定 Discriminator class 的话,默认使用 MDCBasedDiscriminator 类。该类使用 MDC 更新 discriminator.key,如果该值为空,则使用 defaultValue.

MDCBasedDiscriminator 继承自 Discriminator,getDiscriminatingValue返回 discriminator.key 设置的值。

public class MDCBasedDiscriminator extends AbstractDiscriminator<ILoggingEvent> {

    private String key;
    private String defaultValue;

    /**
     * Return the value associated with an MDC entry designated by the Key
     * property. If that value is null, then return the value assigned to the
     * DefaultValue property.
     */
    public String getDiscriminatingValue(ILoggingEvent event) {
        // http://jira.qos.ch/browse/LBCLASSIC-213
        Map<String, String> mdcMap = event.getMDCPropertyMap();
        if (mdcMap == null) {
            return defaultValue;
        }
        String mdcValue = mdcMap.get(key);
        if (mdcValue == null) {
            return defaultValue;
        } else {
            return mdcValue;
        }
    }
    ...
}

我们也可以仿照实现自己的 Discriminator 类,在 DolphinScheduler 里即是如此。

3.2. Marker

Marker 的主要作用是两个:过滤日志;触发某些行为。

例如 SLF4J FAQ3 里提到的:Why doesn’t the org.slf4j.Logger interface have methods for the FATAL level?,因为可以通过 Marker 实现这点:

Marker fatal = MarkerFactory.getMarker("FATAL");
...
logger.error(fatal, "Failed to obtain JDBC connection", e);

触发行为上,例如:

  • SMTPAppender 如果看到 LoggingEvent marked with the NOTIFY_ADMIN,就会发送邮件
  • SiftingAppender 如果看到 LoggingEvent marked with the FINALIZE_SESSION_MARKER,就会结束该 appender4

其中第二点就是为了避免使用SiftingAppender时同时打开过多的文件句柄,通过FINALIZE_SESSION_MARKER就可以及时关闭结束的任务日志。

3.3. 应用-DS日志配置

ds-worker 的日志配置使用到了SiftingAppenderTaskLogFilterTaskLogDiscriminator:

    <appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
        <filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
        <Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
            <key>taskAppId</key>
            <logBase>${log.base}</logBase>
        </Discriminator>
        <sift>
            <appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
                ...
            </appender>
        </sift>
    </appender>

我们重点看下 filter 和 discriminator.

TaskLogFitler 根据日志事件决定输出哪些日志:

public class TaskLogFilter extends Filter<ILoggingEvent> {
    ...
    public FilterReply decide(ILoggingEvent event) {
        FilterReply filterReply = FilterReply.DENY;
        if ((event.getThreadName().startsWith(TaskConstants.TASK_APPID_LOG_FORMAT)
                && event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME))
                || event.getLevel().isGreaterOrEqual(level)) {
            filterReply = FilterReply.ACCEPT;
        }
        logger.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(),
                event.getLoggerName(), filterReply.name(), level);
        return filterReply;
    }
}

可以看到,满足以下两个条件之一会打印日志:
1.线程名以 ‘taskAppId’ 开头 && logger 名以 ‘TaskLogLogger’ 开头
2.日志级别高于设置的级别

TaskLogDiscriminator的实现跟MDCBasedDiscriminator类似

public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
    ...
    public String getDiscriminatingValue(ILoggingEvent event) {
        String key = "unknown_task";
        if (event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) {
            String threadName = event.getThreadName();
            if (threadName.endsWith(TaskConstants.GET_OUTPUT_LOG_SERVICE)) {
                threadName =
                        threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length());
            }
            String part1 = threadName.split(Constants.EQUAL_SIGN)[1];
            String prefix = TaskConstants.TASK_LOGGER_INFO_PREFIX + "-";
            if (part1.startsWith(prefix)) {
                key = part1.substring(prefix.length()).replaceFirst("-", "/");
            }
        }
        logger.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(),
                event.getLoggerName());
        return key;
    }

可以看到日志文件名是根据 threadName 确认的,同时也要求 logger 名字以 ‘TaskLogLogger’ 开头。

3.4. 应用-ShellTask

线程名是在WorkerTaskExecuteRunnable的设置的。

public abstract class WorkerTaskExecuteRunnable implements Runnable {
    protected WorkerTaskExecuteRunnable(..) {
        ...
        String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
                taskExecutionContext.getProcessDefineCode(),
                taskExecutionContext.getProcessDefineVersion(),
                taskExecutionContext.getProcessInstanceId(),
                taskExecutionContext.getTaskInstanceId());
        taskExecutionContext.setTaskLogName(taskLogName);
    }

    public void run() {
        try {
            // set the thread name to make sure the log be written to the task log file
            Thread.currentThread().setName(taskExecutionContext.getTaskLogName());

实现在LoggerUtils.buildTaskId,格式为taskAppId=TASK-提交日期-工作流定义code_工作流定义version-工作流实例id-任务实例id,例如 TASK-20230318-7475262133504_1-1-1

结合TaskLogDiscriminator的实现,因此日志路径即为:20230318/7475262133504_1-1-1

Logger名则是在任务基类AbstractTask里定义:

public abstract class AbstractTask {

    public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION");

    protected final Logger logger =
            LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));

FINALIZE_SESSION_MARKER用于在任务结束时关闭 appender,因此在任务结束时我们就会看到如下日志:

[INFO] 2023-03-19 20:25:02.616 +0800 - FINALIZE_SESSION

综合上面的逻辑,任务日志也就会输出到单独的文件了。

4. 总结

DolphinScheduler的日志里,用到了诸如 MDC、Discriminator、Filter、Marker 等诸多特性,主要是为了两个目的:

  1. 模块日志可以按照 ProcessInstanceId、TaskInstanceId grep查找
  2. 任务日志可以输出到单独的文件,没有使用默认的MDCBasedDiscriminator,而是自定义了TaskLogDiscriminator以确保日志更容易检索

因此,这里对于线程名、logger名的设置准确就非常重要了。线程名taskAppId=TASK-提交日期-工作流定义code_工作流定义version-工作流实例id-任务实例id决定了任务日志的路径,logger名则必须以’TaskLogLogger’开头。

不过同时由于这样的限制,也就不得不实现logHandle这种用于输出其他类日志的方法:

    public void logHandle(LinkedBlockingQueue<String> logs) {
        // note that the "new line" is added here to facilitate log parsing
        if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
            logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString());
        } else {
            StringJoiner joiner = new StringJoiner("\n\t");
            while (!logs.isEmpty()) {
                joiner.add(logs.poll());
            }
            logger.info(" -> {}", joiner);
        }
    }

由于日志输出位置变了,会带来一些可读性的降低。

5. 参考资料

  1. Mapped Diagnostic Context
  2. Appenders
  3. Frequently Asked Questions about SLF4J
  4. Getting the timeout right
  5. What are markers in Java Logging frameworks and what is a reason to use them?