Flink On Yarn 调优

/ flink / 170浏览

现状

目前使用的flink版本为:flink-1.10.1。

向Yarn提交Flink任务时,需要将flink-dist包和用户依赖的相关包上传在hdfs之中。导致任务的启动时间过长。

在YarnClusterDescriptor.startAppMaster方法中可以看到,每次执行时会上传用户相关的jar包到hdfs。

实际工作中,大部分依赖包都是不变的,但是每次提交流都需要上传,这种方式不太合理,我们期望的是不变的依赖只需要上传一次,每次只上传用户编写的代码即可。

方案一

幸运的是,刚有了这个需求,flink发布了1.11.0版本。

先来看下flink1.11.0都有哪些新特性:

Flin支持Application Mode

Flink 1.11.0之前是通过一个单独的客户端来创建 JobGraph 并提交作业的。也就是说,生成JobGrap的动作都发在提交客户端。在实际使用时,会产生下载作业 jar 包、占用客户端机器大量带宽、需要启动单独进程(该进程不受资源管理限制)等问题,这些问题带来的一个影响就是,消耗了客户端资源,对其他客户端工作的任务有一定影响。为了解决这些问题,在 Flink 1.11.0 中提供了一种新的 Application 模式,它将 JobGraph 的生成以及作业的提交转移到 Master 节点进行。

用户可以通过 bin/flink run-application 来使用 application 模式。目前 Application 模式支持 Yarn 和 K8s 的部署方式,Yarn Application 模式会在客户端将运行任务需要的依赖都通过 Yarn Local Resource 传递到 Flink Master,然后在 Master 端进行任务的提交。K8s Application 允许用户构建包含用户 Jar 与依赖的镜像,同时会根据 job 自动创建 TaskManager,并在结束后销毁整个 Cluster。

flink 1.11.0 之前 Flink 在 Yarn 上每提交一个作业都需要上传一次 Flink lib 下的 jars 和用户依赖的jars,从而耗费额外的存储空间和通信带宽。Flink 1.11.0 允许用户提供多个远程的 lib 目录,这些目录下的文件会被缓存到 Yarn 的节点上,从而避免不必要的 Jar 包上传与下载,使提交和启动更快,具体的提交方式如下:

./bin/flink run -m yarn-cluster -d -yD yarn.provided.lib.dirs=hdfs://flink/libs,hdfs://flink/user/libs

此外,flink 1.11.0 还允许用户直接使用远程文件系统上的 Jar 包来创建作业,从而进一步减少 jar 包下载的开销:

./bin/flink run-application -p 10 -t yarn-application -yD yarn.provided.lib.dirs=hdfs://flink/libs,hdfs://flink/user/libs/WordCount.jar

这种方式依赖了上面的特性,即远端进行构建JobGraph 和 任务提交。

其他新特性和本次无关,先不列举了。

对于上面两个特性,可以明确看出,flink帮我们解决了我们当前的需求。即:依赖全部上传在hdfs之中,每次不需要在客户端进行上传。除此之外,每个yarn节点还会缓存依赖,每次任务提交后,分配到的yarn执行节点也不需要再从 hdfs 去拉取jar。既不需要上传,也不需要从hdfs去下载。任务提交速度快了很多。

升级flink版本后,我们的代码如下:

Configuration configuration = new Configuration();
configuration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
// taskmanager 内存大小
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
// jobmanager 内存大小
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1024m"));
// task slots
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
configuration.setBoolean(DeploymentOptions.ATTACHED, true);
// flink dist jar yarn location
configuration.setString(YarnConfigOptions.FLINK_DIST_JAR, "hdfs://nameservice-1/flink/libs/flink-dist_2.11-1.11.0.jar");
// cpu core
configuration.setInteger(YarnConfigOptions.APP_MASTER_VCORES, 2);
// dependency jar dirs
List<String> libDirs = new ArrayList<>();
libDirs.add("hdfs://nameservice-1/flink/libs");
configuration.set(YarnConfigOptions.PROVIDED_LIB_DIRS, libDirs);
// 自定义YarnEnvironment
YarnEnvironment environment = new StreamExecutionEnvironment(configuration, this.getClass().getClassLoader());

DataSource<String> dataSource = environment.fromElements("hello java", "hello flink");
dataSource.print();
JobExecutionResult execute = environment.execute();
execute.getJobID().toString();

任务执行时,需要将hadoop相关的配置放在Resource下。

例如,启动时:

java -classpath "./web.jar:/home/hadoop_conf_dir/::" Application

如果配置了环境变量,也可以使用环境变量:

java -classpath ./web.jar:$HADOOP_CONF_DIR:: Application

方案二

方案一是我们最终的方案。

方案二是一个概念方案,如果不能升flink版本,可以考虑使用此方案。

大家应该能猜得出来。flink1.10的上传和flink1.11的上传,有啥区别呢?我们可以重构flink1.10的YarnClusterDescriptor类,来达到依赖只支持上传一次的目的。