Flink On Yarn 日志输出问题

/ flink / 60浏览

背景

之前在进行 Flink On Yarn 调研 中介绍到,我们的Flink任务提交方式并不是通过shell命令来提交,而是由web接口触发,通过编码方式来提交。

使用编码方式提交任务后,我们打开flink-ui,查看任务执行日志时发现,TaskManager和JobManager的logs面板中均为空(任务停止后可在Yarn中查看详细日志)。如下图:

image-20200722171901073

我们的测试代码如下:

Configuration configuration = new Configuration();
// 设置执行目标为Yarn
configuration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
// 设置TaskManager 内存大小为 1024M、一共4个槽
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
// 设置任务提交后是否继续监听执行过程
configuration.setBoolean(DeploymentOptions.ATTACHED, true);
// 设置Yarn执行环境
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(DefaultExecutorServiceLoader.INSTANCE,configuration,Demo.class.getClassLoader());
ContextEnvironment.setAsContext(factory);

// 测试流
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource = environment.fromElements("hello java", "hello flink");
dataSource.print();

因为一个Flink任务理论上是执行后永久不会停止的,我们不能等流停止了再去看执行日志。所以log的正常打印尤为重要。

先看flink目录结构:

- conf
    - flink-conf.yaml
    - log4j-cli.properties
    - log4j-console.properties
    - log4j.properties
    - log4j-yarn-session.properties
    - logback-console.xml
    - logback.xml
    - logback-yarn.xml
    - masters
    - slaves
    - sql-client-defaults.yaml
    - zoo.cfg

- lib
    - flink-dist_2.11-1.10.1.jar
    - flink_on_yarn_demo-1.0-SNAPSHOT.jar
    - flink-shaded-hadoop-2-2.4.1-9.0.jar
    - flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    - flink-table_2.11-1.10.1.jar
    - flink-table-blink_2.11-1.10.1.jar
    - log4j-1.2.17.jar
    - slf4j-log4j12-1.7.15.jar

从这个目录结构中可以看出来,flink有两类日志配置文件:log4j和logback。也就是说,我们要在yarn上正常打印出日志,需要这几个log配置文件。并且需要提供log4j相关的jar包。

我们通过shell命令的方式跟踪flink正常任务提交,发现shell提交过来的时候,携带了log4j.properties文件。如下图:

image-20200722174122291

任务提交到yarn之后,在yarn的application下可以看到,会将flink运行环境上传在hdfs,包括log4j.properties 和 log4j.jar。

image-20200722174437786

定位到这一步,基本我们也能猜到,如何做才能让yarn上执行的flink任务正常打印日志。

在揭晓答案前,先简单说明一下这几个log配置文件的作用:

- conf
    - log4j-cli.properties
        # 使用flink客户端提交时使用的配置。即执行:flink run xxx
    - log4j-console.properties
        # 使用flink-console.sh时使用的配置
    - log4j.properties
        # flink on yarn 独立任务时使用的配置
    - log4j-yarn-session.properties
        # flink on yarn session 时使用的配置
    - logback-console.xml
        # 同 log4j-console
    - logback.xml
        # 同 log4j
    - logback-yarn.xml
         # 同 log4j-yarn-session

其中,log4j 和 logback 之间的区别,先留个悬念,后面介绍,因为还有个坑要踩。

了解了这些配置文件的作用,那么让日志输出的方式就是,我们的Config中要携带 log4j 配置文件。代码如下:

Configuration configuration = new Configuration();
configuration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
// taskmanager 内存大小
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
// jobmanager 内存大小
configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, "1024m");
// task slots
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
configuration.setBoolean(DeploymentOptions.ATTACHED, true);
// flink dist jar yarn location
configuration.setString(YarnConfigOptions.FLINK_DIST_JAR, "/home/lizy/flink/flink-1.10.1/lib/flink-dist_2.11-1.10.1.jar");

// user jars (用户编写代码:注意此处,后面会有一个新的坑)
configuration.setInteger(YarnConfigOptions.APP_MASTER_VCORES, 1);
List<String> userJars = new ArrayList<>();
userJars.add("file:///home/lizy/test/flink_on_yarn_demo.jar");
configuration.set(PipelineOptions.JARS, userJars);

// flink 其他依赖。将flink lib 目录下的所有jar上传上去。
List<String> shipDirs = new ArrayList<>();
            shipDirs.add("/home/lizy/flink/flink-1.11.0/lib");
            configuration.set(YarnConfigOptions.SHIP_DIRECTORIES, shipDirs);

configuration.setString(YarnConfigOptions.APPLICATION_NAME, "lizy_test");

// 设置日志文件
configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, "/home/lizy/flink/flink-1.10.1/conf/log4j.properties");

StreamExecutionEnvironment env = new StreamExecutionEnvironment(DefaultExecutorServiceLoader.INSTANCE, configuration, this.getClass().getClassLoader());

通过设置log4j配置文件及上传log4j相关jar后,日志便可“正常”显示。但是,日志显示的位置不对。如下图:

image-20200722171944932

我们期望日志输出在Logs面板下,而实际输出在了Stdout下。Stdout下应该只输出out信息才正常。

我们项目中埋下的坑

先介绍下我们项目:

我们项目使用的log是logback来打印日志的。项目中依赖了logback。

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.2.3</version>
</dependency>

项目启动后,会先找到:org.slf4j.impl.StaticLoggerBinder 这个类来对日志进行配置。

介绍到这儿,再回头来看下前面提到的,为什么flink conf 目录下会有 log4j 和 logback 两类日志配置呢。其主要区别就在于此。log4j 是使用log4j.jar来进行日志配置。而logback是由logback.jar来进行配置。

flink默认的目录结构中是log4j,因此我们就会看到日志的映射错乱现象。

当然,填这个坑也很容易

方式一:

​ 项目中依赖的日志打印换成 log4j

方式二:

​ 上传在hdfs的日志配置文件换为 logback.xml

两种方式都可以,根据个人情况来决定。我们选择的方式是第二种。更改后,示例代码如下:

Configuration configuration = new Configuration();
configuration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
// taskmanager 内存大小
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
// jobmanager 内存大小
configuration.set(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, "1024m");
// task slots
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
configuration.setBoolean(DeploymentOptions.ATTACHED, true);
// flink dist jar yarn location
configuration.setString(YarnConfigOptions.FLINK_DIST_JAR, "/home/lizy/flink/flink-1.10.1/lib/flink-dist_2.11-1.10.1.jar");

// user jars (用户编写代码:注意此处,后面会有一个新的坑)
configuration.setInteger(YarnConfigOptions.APP_MASTER_VCORES, 1);
List<String> userJars = new ArrayList<>();
userJars.add("file:///home/lizy/test/flink_on_yarn_demo.jar");
configuration.set(PipelineOptions.JARS, userJars);

// flink 其他依赖。将flink lib 目录下的所有jar上传上去。
List<String> shipDirs = new ArrayList<>();
            shipDirs.add("/home/lizy/flink/flink-1.11.0/lib");
            configuration.set(YarnConfigOptions.SHIP_DIRECTORIES, shipDirs);

configuration.setString(YarnConfigOptions.APPLICATION_NAME, "lizy_test");

// 设置日志文件{修改的这儿}
configuration.setString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE, "/home/lizy/flink/flink-1.10.1/conf/logback.xml");

StreamExecutionEnvironment env = new StreamExecutionEnvironment(DefaultExecutorServiceLoader.INSTANCE, configuration, this.getClass().getClassLoader());

至此,flink日志显示就完美解决了。