Flink On Yarn 如何控制TaskManager数量

/ flink / 150浏览

在Flink 1.5 之前,向Yarn提交Flink任务,通过参数 -yn 即可指定Yarn上TaskManager 的数量。但是在Flink 1.5 之后,废弃了该参数,取而代之的则是根据并发度动态决定TaskManager的数量。
如果手动指定TaskManager,在pre-yarn模式下,并发度为1,TaskManager为4,每个TaskManager的slot为2,实际上使用的只有1个TaskManager,1个slot。剩余3个TaskManager,7个slot空闲。大大的浪费了资源。而且pre-yarn模式下不能提交新的任务进来。因此在flink 1.5之后做了此改变。
Num(TaskManager) = Math.ceil(P/C);
其中,P=并发度。C=每个TaskManager的Slot。由参数:TaskManagerOptions.NUM_TASK_SLOTS决定。

Flink在Yarn上申请资源的过程,通过日志可以大概看得出来。可以根据日志跟踪一下程序源码。

启动时设置的资源:
JobManager内存:1024M
TaskManager内存:1024M
每个TaskManager槽:2
全局任务并发度:8
JobManager核数:2

日志中的几个搜索关键词:

Requesting new TaskExecutor
Received
Starting TaskManagers
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: yarn.appmaster.vcores, 2
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.process.size, 1 gb
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: internal.jobgraph-path, job.graph
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: yarn.flink-dist-jar, hdfs://nameservice-1/flink/libs/flink-dist_2.11-1.11.0.jar
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: execution.attached, true
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: yarn.provided.lib.dirs, hdfs://nameservice-1/flink/libs
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: internal.cluster.execution-mode, NORMAL
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: high-availability.cluster-id, application_1594339882219_0111
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.numberOfTaskSlots, 2
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: execution.target, yarn-per-job
20/07/24 11:08:51 INFO configuration.GlobalConfiguration: Loading configuration property: jobmanager.memory.process.size, 1 gb
20/07/24 11:08:51 INFO externalresource.ExternalResourceUtils: Enabled external resources: []
20/07/24 11:08:51 INFO runner.JobDispatcherLeaderProcess: Start JobDispatcherLeaderProcess.
20/07/24 11:08:51 INFO akka.AkkaRpcService: Starting RPC endpoint for org.apache.flink.runtime.dispatcher.MiniDispatcher at akka://flink/user/rpc/dispatcher_1 .
20/07/24 11:08:51 INFO akka.AkkaRpcService: Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 .
20/07/24 11:08:51 INFO jobmaster.JobMaster: Initializing job Flink Streaming Job (f4855944456046f26720bce0c7175370).
20/07/24 11:08:51 INFO client.RMProxy: Connecting to ResourceManager at cos6-data1.test.allsenseww.com/10.0.0.22:8030
20/07/24 11:08:51 INFO jobmaster.JobMaster: Using restart back off time strategy NoRestartBackoffTimeStrategy for Flink Streaming Job (f4855944456046f26720bce0c7175370).
20/07/24 11:08:51 INFO jobmaster.JobMaster: Running initialization on master for job Flink Streaming Job (f4855944456046f26720bce0c7175370).
20/07/24 11:08:51 INFO jobmaster.JobMaster: Successfully ran initialization on master in 0 ms.
20/07/24 11:08:51 INFO adapter.DefaultExecutionTopology: Built 1 pipelined regions in 1 ms
20/07/24 11:08:51 INFO jobmaster.JobMaster: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
20/07/24 11:08:51 INFO jobmaster.JobMaster: Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@21d91fa6 for Flink Streaming Job (f4855944456046f26720bce0c7175370).
20/07/24 11:08:51 INFO jobmaster.JobManagerRunnerImpl: JobManager runner for job Flink Streaming Job (f4855944456046f26720bce0c7175370) was granted leadership with session id 00000000-0000-0000-0000-000000000000 at akka.tcp://flink@cos6-data4.test.allsenseww.com:60558/user/rpc/jobmanager_2.
20/07/24 11:08:51 INFO jobmaster.JobMaster: Starting execution of job Flink Streaming Job (f4855944456046f26720bce0c7175370) under job master id 00000000000000000000000000000000.
20/07/24 11:08:51 INFO jobmaster.JobMaster: Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Job Flink Streaming Job (f4855944456046f26720bce0c7175370) switched from state CREATED to RUNNING.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Source: Collection Source (1/1) (2469ad8ed9a2551a9a47dfd95a21c021) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Sink: Print to Std. Out (1/8) (3e1eb239730790e315211f9ae350b47c) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Sink: Print to Std. Out (2/8) (b3d23507fcdfd00f0be329a30cc3a216) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Sink: Print to Std. Out (3/8) (39d6ca9e3a4e418ec73cb8f03e61ab7d) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Sink: Print to Std. Out (4/8) (c3c2929ca48916a04496b7bee2acfe41) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Sink: Print to Std. Out (5/8) (27b97bb4d644108767b24d2d527ff9c3) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Sink: Print to Std. Out (6/8) (124943dd3f49b57e8167a11fe6756f53) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Sink: Print to Std. Out (7/8) (83ceb8df723e60e9b4327a681e0c4f7a) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO executiongraph.ExecutionGraph: Sink: Print to Std. Out (8/8) (29cffd19a687a2cdba681b204cc85893) switched from CREATED to SCHEDULED.
20/07/24 11:08:51 INFO slotpool.SlotPoolImpl: Cannot serve slot request, no ResourceManager connected. Adding as pending request [SlotRequestId{39da3deb86f378a5aaaf61f9e9d5e45a}]
20/07/24 11:08:51 INFO jobmaster.JobMaster: Connecting to ResourceManager akka.tcp://flink@cos6-data4.test.allsenseww.com:60558/user/rpc/resourcemanager_*(00000000000000000000000000000000)
20/07/24 11:08:52 INFO yarn.YarnResourceManager: Recovered 0 containers from previous attempts ([]).
20/07/24 11:08:52 INFO yarn.YarnResourceManager: Register application master response contains scheduler resource types: [MEMORY, CPU].
20/07/24 11:08:52 INFO yarn.YarnResourceManager: Container matching strategy: MATCH_VCORE.
20/07/24 11:08:52 INFO impl.NMClientAsyncImpl: Upper bound of the thread pool size is 500
20/07/24 11:08:52 INFO yarn.YarnResourceManager: ResourceManager akka.tcp://flink@cos6-data4.test.allsenseww.com:60558/user/rpc/resourcemanager_0 was granted leadership with fencing token 00000000000000000000000000000000
20/07/24 11:08:52 INFO slotmanager.SlotManagerImpl: Starting the SlotManager.
20/07/24 11:08:52 INFO jobmaster.JobMaster: Resolved ResourceManager address, beginning registration
20/07/24 11:08:52 INFO yarn.YarnResourceManager: Registering job manager 00000000000000000000000000000000@akka.tcp://flink@cos6-data4.test.allsenseww.com:60558/user/rpc/jobmanager_2 for job f4855944456046f26720bce0c7175370.
20/07/24 11:08:52 INFO yarn.YarnResourceManager: Registered job manager 00000000000000000000000000000000@akka.tcp://flink@cos6-data4.test.allsenseww.com:60558/user/rpc/jobmanager_2 for job f4855944456046f26720bce0c7175370.
20/07/24 11:08:52 INFO jobmaster.JobMaster: JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
20/07/24 11:08:52 INFO slotpool.SlotPoolImpl: Requesting new slot [SlotRequestId{39da3deb86f378a5aaaf61f9e9d5e45a}] and profile ResourceProfile{UNKNOWN} from resource manager.
20/07/24 11:08:52 INFO yarn.YarnResourceManager: Request slot with profile ResourceProfile{UNKNOWN} for job f4855944456046f26720bce0c7175370 with allocation id 32d0b101825c970ade797298766e41e0.
20/07/24 11:08:52 INFO conf.Configuration: resource-types.xml not found
20/07/24 11:08:52 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/07/24 11:08:52 INFO yarn.YarnResourceManager: Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=2.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes)}. Number pending workers of this resource is 1.
20/07/24 11:08:57 INFO yarn.YarnResourceManager: Received 1 containers.
20/07/24 11:08:57 INFO yarn.YarnResourceManager: Received 1 containers with resource <memory:1024, vCores:2>, 1 pending container requests.
20/07/24 11:08:57 INFO yarn.YarnResourceManager: TaskExecutor container_1594339882219_0111_01_000002 will be started on cos6-data4.test.allsenseww.com with TaskExecutorProcessSpec {cpuCores=2.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemorySize=230.400mb (241591914 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}.
20/07/24 11:08:57 INFO yarn.YarnResourceManager: Creating container launch context for TaskManagers
20/07/24 11:08:57 INFO yarn.YarnResourceManager: Starting TaskManagers
20/07/24 11:08:57 INFO yarn.YarnResourceManager: Removing container request Capability[<memory:1024, vCores:2>]Priority[1]AllocationRequestId[0]ExecutionTypeRequest[{Execution Type: GUARANTEED, Enforce Execution Type: false}].
20/07/24 11:08:57 INFO yarn.YarnResourceManager: Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:1024, vCores:2>.
20/07/24 11:08:57 INFO impl.NMClientAsyncImpl: Processing Event EventType: START_CONTAINER for Container container_1594339882219_0111_01_000002
20/07/24 11:09:02 INFO yarn.YarnResourceManager: Registering TaskManager with ResourceID container_1594339882219_0111_01_000002 (akka.tcp://flink@cos6-data4.test.allsenseww.com:53429/user/rpc/taskmanager_0) at ResourceManager
20/07/24 11:09:03 INFO slotpool.SlotPoolImpl: Requesting new slot [SlotRequestId{8df53252df1121e16c6d7b93cb6fbc62}] and profile ResourceProfile{UNKNOWN} from resource manager.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Request slot with profile ResourceProfile{UNKNOWN} for job f4855944456046f26720bce0c7175370 with allocation id 3e55d5ec60e5aadfd6721194d465335c.
20/07/24 11:09:03 INFO slotpool.SlotPoolImpl: Requesting new slot [SlotRequestId{219e03e9fbef6941a38575bbab6de3ca}] and profile ResourceProfile{UNKNOWN} from resource manager.
20/07/24 11:09:03 INFO slotpool.SlotPoolImpl: Requesting new slot [SlotRequestId{387964ea27992d5dd701b4dd69800407}] and profile ResourceProfile{UNKNOWN} from resource manager.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Request slot with profile ResourceProfile{UNKNOWN} for job f4855944456046f26720bce0c7175370 with allocation id 7d88f4ef6429d294ebc11447990180e9.
20/07/24 11:09:03 INFO slotpool.SlotPoolImpl: Requesting new slot [SlotRequestId{ab62791b083311f7322a37c96e496352}] and profile ResourceProfile{UNKNOWN} from resource manager.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=2.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes)}. Number pending workers of this resource is 1.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Request slot with profile ResourceProfile{UNKNOWN} for job f4855944456046f26720bce0c7175370 with allocation id 8e54cc8fd31b9492864a4fd105ee718f.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Request slot with profile ResourceProfile{UNKNOWN} for job f4855944456046f26720bce0c7175370 with allocation id 5f74a8f058132bb99b69ceb5412f58ce.
20/07/24 11:09:03 INFO slotpool.SlotPoolImpl: Requesting new slot [SlotRequestId{a59b18414b858d522e9256a64b1030a2}] and profile ResourceProfile{UNKNOWN} from resource manager.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=2.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes)}. Number pending workers of this resource is 2.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Request slot with profile ResourceProfile{UNKNOWN} for job f4855944456046f26720bce0c7175370 with allocation id d47fd0432a38c0e9c23a021a1e69bb76.
20/07/24 11:09:03 INFO slotpool.SlotPoolImpl: Requesting new slot [SlotRequestId{e43dea8f4b2fdfbe7f945d155f29564c}] and profile ResourceProfile{UNKNOWN} from resource manager.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Request slot with profile ResourceProfile{UNKNOWN} for job f4855944456046f26720bce0c7175370 with allocation id 8f5a2e8e5679ba49674bbe14e6c5979d.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Requesting new TaskExecutor container with resource WorkerResourceSpec {cpuCores=2.0, taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemSize=230.400mb (241591914 bytes)}. Number pending workers of this resource is 3.
20/07/24 11:09:03 INFO slotpool.SlotPoolImpl: Requesting new slot [SlotRequestId{555501b4adfae98c7e45e3f7977980fe}] and profile ResourceProfile{UNKNOWN} from resource manager.
20/07/24 11:09:03 INFO yarn.YarnResourceManager: Request slot with profile ResourceProfile{UNKNOWN} for job f4855944456046f26720bce0c7175370 with allocation id 7fea60b111239bacf989ac426b21aea8.
20/07/24 11:09:03 INFO slotpool.SlotPoolImpl: Received repeated offer for slot [32d0b101825c970ade797298766e41e0]. Ignoring.
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Received 3 containers.
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Received 3 containers with resource <memory:1024, vCores:2>, 3 pending container requests.
20/07/24 11:09:04 INFO yarn.YarnResourceManager: TaskExecutor container_1594339882219_0111_01_000004 will be started on cos6-data5.test.allsenseww.com with TaskExecutorProcessSpec {cpuCores=2.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemorySize=230.400mb (241591914 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}.
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Creating container launch context for TaskManagers
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Starting TaskManagers
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Removing container request Capability[<memory:1024, vCores:2>]Priority[1]AllocationRequestId[0]ExecutionTypeRequest[{Execution Type: GUARANTEED, Enforce Execution Type: false}].
20/07/24 11:09:04 INFO yarn.YarnResourceManager: TaskExecutor container_1594339882219_0111_01_000005 will be started on cos6-data5.test.allsenseww.com with TaskExecutorProcessSpec {cpuCores=2.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemorySize=230.400mb (241591914 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}.
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Creating container launch context for TaskManagers
20/07/24 11:09:04 INFO impl.NMClientAsyncImpl: Processing Event EventType: START_CONTAINER for Container container_1594339882219_0111_01_000004
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Starting TaskManagers
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Removing container request Capability[<memory:1024, vCores:2>]Priority[1]AllocationRequestId[0]ExecutionTypeRequest[{Execution Type: GUARANTEED, Enforce Execution Type: false}].
20/07/24 11:09:04 INFO yarn.YarnResourceManager: TaskExecutor container_1594339882219_0111_01_000006 will be started on cos6-data3.test.allsenseww.com with TaskExecutorProcessSpec {cpuCores=2.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=25.600mb (26843542 bytes), taskOffHeapSize=0 bytes, networkMemSize=64.000mb (67108864 bytes), managedMemorySize=230.400mb (241591914 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}.
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Creating container launch context for TaskManagers
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Starting TaskManagers
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Removing container request Capability[<memory:1024, vCores:2>]Priority[1]AllocationRequestId[0]ExecutionTypeRequest[{Execution Type: GUARANTEED, Enforce Execution Type: false}].
20/07/24 11:09:04 INFO yarn.YarnResourceManager: Accepted 3 requested containers, returned 0 excess containers, 0 pending container requests of resource <memory:1024, vCores:2>.
20/07/24 11:09:04 INFO impl.NMClientAsyncImpl: Processing Event EventType: START_CONTAINER for Container container_1594339882219_0111_01_000005
20/07/24 11:09:04 INFO impl.NMClientAsyncImpl: Processing Event EventType: START_CONTAINER for Container container_1594339882219_0111_01_000006
20/07/24 11:09:09 INFO yarn.YarnResourceManager: Registering TaskManager with ResourceID container_1594339882219_0111_01_000006 (akka.tcp://flink@cos6-data3.test.allsenseww.com:58254/user/rpc/taskmanager_0) at ResourceManager
20/07/24 11:09:10 INFO yarn.YarnResourceManager: Registering TaskManager with ResourceID container_1594339882219_0111_01_000005 (akka.tcp://flink@cos6-data5.test.allsenseww.com:56155/user/rpc/taskmanager_0) at ResourceManager
20/07/24 11:09:10 INFO yarn.YarnResourceManager: Registering TaskManager with ResourceID container_1594339882219_0111_01_000004 (akka.tcp://flink@cos6-data5.test.allsenseww.com:43775/user/rpc/taskmanager_0) at ResourceManager