1. 日志的作用
对架构RD而言,对模块的了解不应当仅仅局限于正常流程本身,而是能够分析各种异常场景,提前添加对应的日志和Metrics等信息。例如日志的格式、位置,日志级别能否Change on-the-fly等,目的是能够通过日志分析流量、追查问题。
日志有两种:
- write-ahead logging,例如leveldb笔记之2:日志
- diagnostic logs,例如glog源码解析一:整体结构
这篇笔记想要谈的是第2种。
任务调度系统除了模块自身的日志,还需要打印不同任务的日志,日志互不影响。
举个例子,ds-worker 模块日志有两个appender: WORKERLOGFILE、TASKLOGFILE.
WORKERLOGFILE 负责输出模块日志,TASKLOGFILE 负责输出每个 task 的日志。
2. 模块日志
模块日志最常见的想法是 logid、rpcid、traceid 等,无外乎以下两个目的:
- 通过唯一id串联模块内部的处理流程:多个方法、线程间传递
- 通过唯一id串联系统整体的处理流程:多个模块、服务间的调用关系
例如阿里中台里提到的鹰眼系统。
2.1. Mapped Diagnostic Context
任务调度系统需要调度、执行不同的任务,因此需要考虑按照任务id串联所有日志,而在调度时为了确保任务互不影响,单个任务会在单独的线程、进程里处理。
MDC1非常适合这个背景:
Most real-world distributed systems need to deal with multiple clients simultaneously. In a typical multithreaded implementation of such a system, different threads will handle different clients. A possible but slightly discouraged approach to differentiate the logging output of one client from another consists of instantiating a new and separate logger for each client. This technique promotes the proliferation of loggers and may increase their management overhead.
简言之:MDC 支持指定 log 的 context,N 条 log 语句之间可以共用该 context.
public class MDC
static MDCAdapter mdcAdapter;
static void put(String key, String val) {
mdcAdapter.put(key, val);
}
staitc void remove(String key) {
mdcAdapter.remove(key);
}
MDCAdapter
是一个接口,各个日志库有自己的实现,基本上都是基于 thread-local 的变量,例如
class LogbackMDCAdapter implements MDCAdapter
final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal<Map<String, String>>();
因此原理上就比较清楚了,线程内设置变量,配置日志格式,需要时从日志里取出来打印设置的字段值。
2.2. 应用
在DS的线程模型里介绍过,任务的执行封装在WorkerTaskExecuteRunnable::run
.
在该方法里,任务执行前会调用 MDC set processInstanceId、taskInstanceId,执行完成后remove。
abstract class WorkerTaskExecuteRunnable implements Runnable
public void run()
try
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
exeucteTask(...);
...
finally
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
在日志 pattern 里配置[WorkflowInstance-%X{workflowInstanceId:-0}][TaskInstance-%X{taskInstanceId:-0}]
,就可以方便的查找指定 process task 的日志了。
例如:
[INFO] 2023-03-19 20:17:22.377 +0800 TaskLogLogger-class org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable:[261] - [WorkflowInstance-1][TaskInstance-2] - Send task execute result to master,...
3. 任务日志
模块日志输出模块级别的信息,例如任务的调度流程、任务的返回结果、是否重试等。任务日志输出任务级别的信息,主要是任务自身的执行过程。
因此,每个任务需要写入单独的日志文件,任务结束时关闭文件,方便查看。
3.1. SiftingAppender
不同于我们常用的RollingFileAppender
,SiftingAppender
的作用在于分发。解析日志Event,将日志分发到不同的真正负责输出日志的 Appender.
举个例子,日志配置如下:
<appender name="SIFTLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<!--discriminator鉴别器,根据taskId这个key对应的value鉴别日志事件,然后委托给具体appender写日志-->
<!-- in the absence of the class attribute, it is assumed that the
desired discriminator type is
ch.qos.logback.classic.sift.MDCBasedDiscriminator -->
<discriminator>
<key>taskId</key>
<defaultValue>defaultTaskId</defaultValue>
</discriminator>
<sift>
<!--具体的写日志appender,每一个taskId创建一个文件-->
<appender name="File-${taskId}" class="ch.qos.logback.core.FileAppender">
<file>/tmp/dolphinscheduler/taskLogs/${taskId}</file>
<append>true</append>
<encoder charset="UTF-8">
<pattern>[%d{yyyy-MM-dd HH:mm:ss.SSS}] %-5level %logger{35} %marker - %msg%n</pattern>
</encoder>
</appender>
</sift>
</appender>
代码里打印日志前后,使用 MDC 写入taskId
的值=12315
MDC.put("taskId", "12315");
logger.info("this is a log IN MDC.");
MDC.remove("taskId");
日志就输出到单独的日志文件/tmp/dolphinscheduler/taskLogs/12315
了。
之所以能够达到这样的效果,是因为如果不指定 Discriminator class 的话,默认使用 MDCBasedDiscriminator 类。该类使用 MDC 更新 discriminator.key,如果该值为空,则使用 defaultValue.
MDCBasedDiscriminator 继承自 Discriminator,getDiscriminatingValue
返回 discriminator.key 设置的值。
public class MDCBasedDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
private String key;
private String defaultValue;
/**
* Return the value associated with an MDC entry designated by the Key
* property. If that value is null, then return the value assigned to the
* DefaultValue property.
*/
public String getDiscriminatingValue(ILoggingEvent event) {
// http://jira.qos.ch/browse/LBCLASSIC-213
Map<String, String> mdcMap = event.getMDCPropertyMap();
if (mdcMap == null) {
return defaultValue;
}
String mdcValue = mdcMap.get(key);
if (mdcValue == null) {
return defaultValue;
} else {
return mdcValue;
}
}
...
}
我们也可以仿照实现自己的 Discriminator 类,在 DolphinScheduler 里即是如此。
3.2. Marker
Marker 的主要作用是两个:过滤日志;触发某些行为。
例如 SLF4J FAQ3 里提到的:Why doesn’t the org.slf4j.Logger interface have methods for the FATAL level?,因为可以通过 Marker 实现这点:
Marker fatal = MarkerFactory.getMarker("FATAL");
...
logger.error(fatal, "Failed to obtain JDBC connection", e);
触发行为上,例如:
- SMTPAppender 如果看到 LoggingEvent marked with the NOTIFY_ADMIN,就会发送邮件
- SiftingAppender 如果看到 LoggingEvent marked with the FINALIZE_SESSION_MARKER,就会结束该 appender4
其中第二点就是为了避免使用SiftingAppender
时同时打开过多的文件句柄,通过FINALIZE_SESSION_MARKER
就可以及时关闭结束的任务日志。
3.3. 应用-DS日志配置
ds-worker 的日志配置使用到了SiftingAppender
、TaskLogFilter
、TaskLogDiscriminator
:
<appender name="TASKLOGFILE" class="ch.qos.logback.classic.sift.SiftingAppender">
<filter class="org.apache.dolphinscheduler.service.log.TaskLogFilter"/>
<Discriminator class="org.apache.dolphinscheduler.service.log.TaskLogDiscriminator">
<key>taskAppId</key>
<logBase>${log.base}</logBase>
</Discriminator>
<sift>
<appender name="FILE-${taskAppId}" class="ch.qos.logback.core.FileAppender">
...
</appender>
</sift>
</appender>
我们重点看下 filter 和 discriminator.
TaskLogFitler
根据日志事件决定输出哪些日志:
public class TaskLogFilter extends Filter<ILoggingEvent> {
...
public FilterReply decide(ILoggingEvent event) {
FilterReply filterReply = FilterReply.DENY;
if ((event.getThreadName().startsWith(TaskConstants.TASK_APPID_LOG_FORMAT)
&& event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME))
|| event.getLevel().isGreaterOrEqual(level)) {
filterReply = FilterReply.ACCEPT;
}
logger.debug("task log filter, thread name:{}, loggerName:{}, filterReply:{}, level:{}", event.getThreadName(),
event.getLoggerName(), filterReply.name(), level);
return filterReply;
}
}
可以看到,满足以下两个条件之一会打印日志:
1.线程名以 ‘taskAppId’ 开头 && logger 名以 ‘TaskLogLogger’ 开头
2.日志级别高于设置的级别
TaskLogDiscriminator
的实现跟MDCBasedDiscriminator
类似
public class TaskLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {
...
public String getDiscriminatingValue(ILoggingEvent event) {
String key = "unknown_task";
if (event.getLoggerName().startsWith(TaskConstants.TASK_LOG_LOGGER_NAME)) {
String threadName = event.getThreadName();
if (threadName.endsWith(TaskConstants.GET_OUTPUT_LOG_SERVICE)) {
threadName =
threadName.substring(0, threadName.length() - TaskConstants.GET_OUTPUT_LOG_SERVICE.length());
}
String part1 = threadName.split(Constants.EQUAL_SIGN)[1];
String prefix = TaskConstants.TASK_LOGGER_INFO_PREFIX + "-";
if (part1.startsWith(prefix)) {
key = part1.substring(prefix.length()).replaceFirst("-", "/");
}
}
logger.debug("task log discriminator end, key is:{}, thread name:{}, loggerName:{}", key, event.getThreadName(),
event.getLoggerName());
return key;
}
可以看到日志文件名是根据 threadName 确认的,同时也要求 logger 名字以 ‘TaskLogLogger’ 开头。
3.4. 应用-ShellTask
线程名是在WorkerTaskExecuteRunnable
的设置的。
public abstract class WorkerTaskExecuteRunnable implements Runnable {
protected WorkerTaskExecuteRunnable(..) {
...
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
}
public void run() {
try {
// set the thread name to make sure the log be written to the task log file
Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
实现在LoggerUtils.buildTaskId
,格式为taskAppId=TASK-提交日期-工作流定义code_工作流定义version-工作流实例id-任务实例id
,例如
TASK-20230318-7475262133504_1-1-1
结合TaskLogDiscriminator
的实现,因此日志路径即为:20230318/7475262133504_1-1-1
Logger名则是在任务基类AbstractTask
里定义:
public abstract class AbstractTask {
public static final Marker FINALIZE_SESSION_MARKER = MarkerFactory.getMarker("FINALIZE_SESSION");
protected final Logger logger =
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
FINALIZE_SESSION_MARKER
用于在任务结束时关闭 appender,因此在任务结束时我们就会看到如下日志:
[INFO] 2023-03-19 20:25:02.616 +0800 - FINALIZE_SESSION
综合上面的逻辑,任务日志也就会输出到单独的文件了。
4. 总结
DolphinScheduler的日志里,用到了诸如 MDC、Discriminator、Filter、Marker 等诸多特性,主要是为了两个目的:
- 模块日志可以按照 ProcessInstanceId、TaskInstanceId grep查找
- 任务日志可以输出到单独的文件,没有使用默认的
MDCBasedDiscriminator
,而是自定义了TaskLogDiscriminator
以确保日志更容易检索
因此,这里对于线程名、logger名的设置准确就非常重要了。线程名taskAppId=TASK-提交日期-工作流定义code_工作流定义version-工作流实例id-任务实例id
决定了任务日志的路径,logger名则必须以’TaskLogLogger’开头。
不过同时由于这样的限制,也就不得不实现logHandle
这种用于输出其他类日志的方法:
public void logHandle(LinkedBlockingQueue<String> logs) {
// note that the "new line" is added here to facilitate log parsing
if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString());
} else {
StringJoiner joiner = new StringJoiner("\n\t");
while (!logs.isEmpty()) {
joiner.add(logs.poll());
}
logger.info(" -> {}", joiner);
}
}
由于日志输出位置变了,会带来一些可读性的降低。