Flink On Yarn 调研

/ flink / 210浏览

背景

以前的flink任务,是通过remote方式提交在standalone集群中。

standalone存在一些缺点:

作为flink使用者,我们希望flink的各任务是独立互不相关的,某个任务的失败不影响其他的任务。

为了这个目的,考虑将flink任务运行在yarn上。

传统的flink任务提交方式:

./bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

该方式是通过命令式的方式来提交任务。而flink的某些使用者,对于flink底层任务运行并不是很了解,而且线上也没有开放权限来提交shell命令,因此,我们需要隐藏提交方式,只提供一个接口给外部,让flink使用者通过页面上点击提交按钮来提交flink任务。

目前flink支持的远程提交方式是:RemoteEnvironment,实现如下:

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("10.0.0.22", 8088, "jars");

该种提交方式,只能提交到flink standalone集群中,但是不能提交到Yarn集群。

既然shell可以提交任务到flink,那么我们来看下,shell是如何把任务提交上去的。

flink shell命令提交任务到Yarn:

./bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

在flink shell脚本中,有一些比较关键的代码:

# get flink config
. "$bin"/config.sh

它会去调用config.sh脚本,拿到系统的一些环境变量。如HADOOP_CLASSPATH、YARN_HOME等。

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

该句是用来提交flink任务的入口。把它打印一下,可以看出来它的提交方式如下:

/root/bigdata/jdk/jdk1.8.0_11/bin/java \
-Dlog.file=/root/bigdata/flink/flink-1.10.1/log/flink-root-client-host1.log \
-Dlog4j.configuration=file:/root/bigdata/flink/flink-1.10.1/conf/log4j-cli.properties \
-Dlogback.configurationFile=file:/root/bigdata/flink/flink-1.10.1/conf/logback.xml \
-classpath /root/test/flink_on_yarn_demo-1.0-SNAPSHOT.jar \
    :/root/bigdata/flink/flink-1.10.1/lib/flink-shaded-hadoop-2-uber-2.6.5-10.0.jar \
    :/root/bigdata/flink/flink-1.10.1/lib/flink-table_2.11-1.10.1.jar \
    :/root/bigdata/flink/flink-1.10.1/lib/flink-table-blink_2.11-1.10.1.jar \
    :/root/bigdata/flink/flink-1.10.1/lib/log4j-1.2.17.jar \
    :/root/bigdata/flink/flink-1.10.1/lib/slf4j-log4j12-1.7.15.jar \
    :/root/bigdata/flink/flink-1.10.1/lib/flink-dist_2.11-1.10.1.jar \
    :/root/bigdata/hadoop/conf:: org.apache.flink.client.cli.CliFrontend run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar

通过打印的真实提交方式可以看到,在提交flink任务到yarn时,向classpath中加入了flink依赖包和hadoop_conf配置。期中,hadoop_conf中指定了yarn的环境信息。

CliFrontend运行逻辑

CliFrontend.main

下面是main方法中的关键代码:

public static void main(final String[] args) {
    /*
     * 1. 获取flink conf 目录
     * 如果系统配置了flink环境变量,则直接拿取系统配置的目录。ConfigConstants.ENV_FLINK_CONF_DIR
     * 如果系统没有配置flink环境变量,则返回当前提交任务的目录的 ./conf 
     */
    final String configurationDirectory = getConfigurationDirectoryFromEnv();

    // 2. 从1返回的目录中读取flink-conf.yaml文件
    final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

    // 3. 执行run
    final CliFrontend cli = new CliFrontend(
        configuration,
        customCommandLines);

    SecurityUtils.install(new SecurityConfiguration(cli.configuration));
    int retCode = SecurityUtils.getInstalledContext()
        .runSecured(() -> cli.parseParameters(args));
    System.exit(retCode);
}
CliFrontend.run

下面是run方法的关键代码:

protected void run(String[] args) throws Exception {
    // 加载用户jar包及依赖包
    final List<URL> jobJars = program.getJobJarAndDependencies();
    // 执行任务,期中,effectiveConfiguration为flink-confi中的配置及run时指定的配置
    executeProgram(effectiveConfiguration, program);
}
ClientUtils.executeProgram

run方法执行后,真实提交的地方。部分关键代码:

public static void executeProgram() throws ProgramInvocationException {
    // 用户编写的classLoader
    final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
    ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
        executorServiceLoader,
        configuration,
        userCodeClassLoader);
    // 设置ContextEnvironment上下文
    ContextEnvironment.setAsContext(factory);
    // 反射方式执行用户编写的main函数
    program.invokeInteractiveModeForExecution();
}

从这儿可以看出,flink程序提交的关键是:ContextEnvironment

如果ContextEnvironment是yarn,则提交在yarn集群上,如果是remote,则提交到standalone集群上。

用户程序执行流程

ClientUtils中通过反射执行代码时,将执行用户编写的java程序。

在WordCount实例中,断点查看configuration

ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();

可以看出来,flink上下文已经发生了变化。关键信息:

execution.target = yarn-per-job   --> DeploymentOptions.TARGET
pipeline.jars = ["/example/batch/WordCount.jar"]  -->  PipelineOptions.JARS
ExecutionEnvironment.execute

execute用来执行用户代码,部分关键代码:

public JobExecutionResult execute(String jobName) throws Exception {
    // 异步执行,flink on yarn 核心逻辑
    final JobClient jobClient = executeAsync(jobName);

    if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
        // 客户端保持和yarn保持连接,获取yarn上的任务执行过程,默认false
        lastJobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
    } else {
        // 客户端和yarn断开连接,提交后客户端停止,yarn继续执行任务
        lastJobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
    }
    return lastJobExecutionResult;
}
ExecutionEnvironment.executeAsync

上面的execute逻辑主要是用来判断是否监控任务运行过程,实际任务的执行在该方法中。部分核心代码如下:

public JobClient executeAsync(String jobName) throws Exception {
    // 1. 根据用户编写代码,创建执行计划
    final Plan plan = createProgramPlan(jobName);
    // 2. 决定在何处执行。yarn-per-job 对应 YarnJobClusterExecutorFactory
    final PipelineExecutorFactory executorFactory =
        executorServiceLoader.getExecutorFactory(configuration);
    // 3. 真实执行用户代码
    CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
        .getExecutor(configuration)
        .execute(plan, configuration);
}
YarnClusterClientFactory.execute

该方法主要用来创建YarnClient。并通过YarnClient提交任务到Yarn中。核心代码如下:

public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
    // 1. 生成JobGraph
    final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);

    // 2. 创建YarnClient。YarnClient的参数根据提交任务时设置的hadoop_conf来创建
    final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);

    // 3. 获取Yarn上的一些特殊配置
    final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);

    // 4. 任务部署到Yarn
    clusterDescriptor.deployJobCluster(
        clusterSpecification, jobGraph, configAccessor.getDetachedMode());
}
YarnClusterClientFactory.deployJobCluster

该方法会做很多事

YarnJobClusterEntrypoint.main

任务成功提交到YARN中后,会执行该函数。clusterEntrypoint.startCluster();

以上便是FlinkOnYarn客户端提交流程。

通过前面的分析,可以大致画一下flink on yarn的流程图。

客户端将代码提交到Yarn之后,Yarn开始申请资源(YARN ResourceManager中的逻辑),申请好资源后,ResourceManager将执行逻辑发送到NodeManager中,在第一个NodeManager中启动FlinkJobManager。

启动入口:ClusterEntrypoint。在本例中,执行:YarnJobClusterEntrypoint.main

上图的流程:

1.YARN RM中的ClientRMService(为普通用户提供的RPC服务组件,处理来自客户端的各种RPC请求,比如查询YARN集群信息,提交、终止应用等)接收到应用提交请求,简单校验后将请求转交给RMAppManager(YARN RM内部管理应用生命周期的组件);

2.RMAppManager根据应用提交上下文内容创建初始状态为NEW的应用,将应用状态持久化到RM状态存储服务(例如ZooKeeper集群,RM状态存储服务用来保证RM重启、HA切换或发生故障后集群应用能够正常恢复,后续流程中的涉及状态存储时不再赘述),应用状态变为NEW_SAVING;

3.应用状态存储完成后,应用状态变为SUBMITTED;RMAppManager开始向ResourceScheduler(YARN RM可拔插资源调度器,YARN自带三种调度器FifoScheduler/FairScheduler/CapacityScheduler,其中CapacityScheduler支持功能最多使用最广泛,FifoScheduler功能最简单基本不可用,今年社区已明确不再继续支持FairScheduler,建议已有用户迁至CapacityScheduler)提交应用,如果无法正常提交(例如队列不存在、不是叶子队列、队列已停用、超出队列最大应用数限制等)则抛出拒绝该应用,应用状态先变为FINAL_SAVING触发应用状态存储流程并在完成后变为FAILED;如果提交成功,应用状态变为ACCEPTED;

4.开始创建应用运行实例(ApplicationAttempt,由于一次运行实例中最重要的组件是ApplicationMaster,下文简称AM,它的状态代表了ApplicationAttempt的当前状态,所以ApplicationAttempt实际也代表了AM),初始状态为NEW;

5.初始化应用运行实例信息,并向ApplicationMasterService(AM&RM协议接口服务,处理来自AM的请求,主要包括注册和心跳)注册,应用实例状态变为SUBMITTED;

6.RMAppManager维护的应用实例开始初始化AM资源申请信息并重新校验队列,然后向ResourceScheduler申请AM Container(Container是YARN中资源的抽象,包含了内存、CPU等多维度资源),应用实例状态变为ACCEPTED;

7.ResourceScheduler会根据优先级(队列/应用/请求每个维度都有优先级配置)从根队列开始层层递进,先后选择当前优先级最高的子队列、应用直至具体某个请求,然后结合集群资源分布等情况作出分配决策,AM Container分配成功后,应用实例状态变为ALLOCATED_SAVING,并触发应用实例状态存储流程,存储成功后应用实例状态变为ALLOCATED;

8.RMAppManager维护的应用实例开始通知ApplicationMasterLauncher(AM生命周期管理服务,负责启动或清理AM container)启动AM container,ApplicationMasterLauncher与YARN NodeManager(下文简称YARN NM,与YARN RM保持通信,负责管理单个节点上的全部资源、Container生命周期、附属服务等,监控节点健康状况和Container资源使用)建立通信并请求启动AM container;

9.ContainerManager(YARN NM核心组件,管理所有Container的生命周期)接收到AM container启动请求,YARN NM开始校验Container Token及资源文件,创建应用实例和Container实例并存储至本地,结果返回后应用实例状态变为LAUNCHED;

10.ResourceLocalizationService(资源本地化服务,负责Container所需资源的本地化。它能够按照描述从HDFS上下载Container所需的文件资源,并尽量将它们分摊到各个磁盘上以防止出现访问热点)初始化各种服务组件、创建工作目录、从HDFS下载运行所需的各种资源至Container工作目录(路径为: ${yarn.nodemanager.local-dirs}/usercache/${user}/appcache//);

11.ContainersLauncher(负责container的具体操作,包括启动、重启、恢复和清理等)将待运行Container所需的环境变量和运行命令写到Container工作目录下的launch_container.sh脚本中,然后运行该脚本启动Container;

12.Container进程加载并运行ClusterEntrypoint(Flink JobManager入口类,每种集群部署模式和应用运行模式都有相应的实现,例如在YARN集群部署模式下,per-job应用运行模式实现类是YarnJobClusterEntrypoint,session应用运行模式实现类是YarnSessionClusterEntrypoint),首先初始化相关运行环境:

13.启动ResourceManager(Flink资源管理核心组件,包含YarnResourceManager和SlotManager两个子组件,YarnResourceManager负责外部资源管理,与YARN RM建立通信并保持心跳,申请或释放TaskManager资源,注销应用等;SlotManager则负责内部资源管理,维护全部Slot信息和状态)及相关服务,创建异步AMRMClient,开始注册AM,注册成功后每隔一段时间(心跳间隔配置项:${yarn.heartbeat.interval},默认5s)向YARN RM发送心跳来发送资源更新请求和接受资源变更结果。YARN RM内部该应用和应用运行实例的状态都变为RUNNING,并通知AMLivelinessMonitor服务监控AM是否存活状态,当心跳超过一定时间(默认10分钟)触发AM failover流程;

14.启动Dispatcher(负责接收用户提供的作业,并且负责为这个新提交的作业拉起一个新的 JobManager)及相关服务(包括REST endpoint等),在per-job运行模式下,Dispatcher将直接从Container工作目录加载JobGraph文件;在session运行模式下,Dispatcher将在接收客户端提交的Job(_通过BlockServer接收job graph文件)后再进行后续流程;

15.根据JobGraph启动JobManager(负责作业调度、管理Job和Task的生命周期),构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构);

16.JobManager开始执行ExecutionGraph,向ResourceManager申请资源;

17.ResourceManager将资源请求加入等待请求队列,并通过心跳向YARN RM申请新的Container资源来启动TaskManager进程;后续流程如果有空闲Slot资源,SlotManager将其分配给等待请求队列中匹配的请求,不用再通过18. YarnResourceManager申请新的TaskManager;

18.YARN ApplicationMasterService接收到资源请求后,解析出新的资源请求并更新应用请求信息;

19.YARN ResourceScheduler成功为该应用分配资源后更新应用信息,ApplicationMasterService接收到Flink JobManager的下一次心跳时返回新分配资源信息;

20.Flink ResourceManager接收到新分配的Container资源后,准备好TaskManager启动上下文(ContainerLauncherContext,生成TaskManager配置并上传至分布式存储,配置其他依赖和环境变量等),然后向YARN NM申请启动TaskManager进程,YARN NM启动Container的流程与AM Container启动流程基本类似,区别在于应用实例在NM上已存在并未RUNNING状态时则跳过应用实例初始化流程,这里不再赘述;

21.TaskManager进程加载并运行YarnTaskExecutorRunner(Flink TaskManager入口类),初始化流程完成后启动TaskExecutor(负责执行Task相关操作);

22.TaskExecutor启动后先向ResourceManager注册,成功后再向SlotManager汇报自己的Slot资源与状态;
SlotManager接收到Slot空闲资源后主动触发Slot分配,从等待请求队列中选出合适的资源请求后,向
TaskManager请求该Slot资源

23.TaskManager收到请求后检查该Slot是否可分配(不存在则返回异常信息)、Job是否已注册(没有则先注册再分配Slot),检查通过后将Slot分配给JobManager;

24.JobManager检查Slot分配是否重复,通过后通知Execution执行部署task流程,向TaskExecutor提交task;
TaskExecutor启动新的线程运行Task。

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.yarn.executors.YarnJobClusterExecutor;

public class Demo {

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
        configuration.setBoolean(DeploymentOptions.ATTACHED, true);

        ContextEnvironmentFactory factory = new ContextEnvironmentFactory(DefaultExecutorServiceLoader.INSTANCE,
                configuration,
                Demo.class.getClassLoader());
        ContextEnvironment.setAsContext(factory);
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> dataSource = environment.fromElements("hello java", "hello flink");
        dataSource.print();
    }

}

原始执行方式:

/root/bigdata/jdk/jdk1.8.0_11/bin/java -Dlog.file=/root/bigdata/flink/flink-1.10.1/log/flink-root-client-host1.log -Dlog4j.configuration=file:/root/bigdata/flink/flink-1.10.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/root/bigdata/flink/flink-1.10.1/conf/logback.xml -classpath /root/test/flink_on_yarn_demo-1.0-SNAPSHOT.jar:/root/bigdata/flink/flink-1.10.1/lib/flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:/root/bigdata/flink/flink-1.10.1/lib/flink-table_2.11-1.10.1.jar:/root/bigdata/flink/flink-1.10.1/lib/flink-table-blink_2.11-1.10.1.jar:/root/bigdata/flink/flink-1.10.1/lib/log4j-1.2.17.jar:/root/bigdata/flink/flink-1.10.1/lib/slf4j-log4j12-1.7.15.jar:/root/bigdata/flink/flink-1.10.1/lib/flink-dist_2.11-1.10.1.jar:/root/bigdata/hadoop/conf:: Demo

打包依赖后执行方式:

dependencies {
    include(dependency('org.apache.flink:flink-.*:'))
    include(dependency('com.google.protobuf:protobuf-java:'))
    include(dependency('org.slf4j:slf4j-log4j12:'))
    include(dependency('org.slf4j:slf4j-api:'))
    include(dependency('log4j:log4j:'))
    include(dependency('commons-lang:commons-lang:'))
    include(dependency('commons-logging:commons-logging:'))
    include(dependency('commons-io:commons-io:'))
    include(dependency('commons-collections:commons-collections:'))
    include(dependency('commons-cli:commons-cli:'))
    include(dependency('commons-configuration:commons-configuration:'))
    include(dependency('org.apache.commons:commons-lang3:'))
    include(dependency('com.esotericsoftware.kryo:kryo:'))
    include(dependency('org.scala-lang:scala-library:'))
}
java -classpath flink_on_yarn_demo-1.0-SNAPSHOT-all.jar:/root/bigdata/hadoop/conf:: Demo /root/bigdata/flink/flink-1.10.1/lib/flink-dist_2.11-1.10.1.jar

说明:
将程序执行所依赖的jars打入一个包,运行程序。Demo后的参数是指flink-dist包地址,该包将会上传到yarn上去执行flink任务。java代码:

configuration.setString(YarnConfigOptions.FLINK_DIST_JAR, args[0]);

Flink提交到Yarn上,有一些参数可以来设置。比如JobManager内存大小、TaskManager内存大小等。

参数 常量 默认值 说明
execution.target DeploymentOptions.TARGET no flink程序执行目标。PipelineExecutor子类。
可选值:local、remote、yarn-per-job、yarn-session、kubernetes-session等。其中,yarn、k8s需要依赖对应jar
execution.attached DeploymentOptions.ATTACHED false 提交任务方式。
false时,提交任务后,客户端和Yarn断开连接。
true时,提交任务后,保持连接,获取任务在Yarn上的执行过程
yarn.appmaster.vcores YarnConfigOptions.APP_MASTER_VCORES 1 yarn application master 核数
yarn.flink-dist-jar YarnConfigOptions.FLINK_DIST_JAR no 上传在yarn中的flink核心包
yarn.application.name YarnConfigOptions.APPLICATION_NAME no Flink应用程序在Yarn上执行的名称
taskmanager.heap.size TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY no taskmanager堆内存
taskmanager.heap.mb TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB no taskmanager堆内存,MB
taskmanager.memory.process.size TaskManagerOptions.TOTAL_PROCESS_MEMORY no taskmanager进程总内存
jobmanager.heap.size JobManagerOptions.JOB_MANAGER_HEAP_MEMORY no jobmanager堆内存大小
jobmanager.heap.mb JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB no jobmanager堆内存大小,MB
jobmanager.memory.process.size JobManagerOptions.TOTAL_PROCESS_MEMORY no jobmanager进程内存大小