Flink On Yarn 并发问题采坑记

/ flink / 90浏览

背景

前几篇已经介绍了Flink On Yarn的方法。由于flink任务的提交方式是通过java web来提交的,并发肯定是避免不了的。单纯两个并发时,就有一个job会提交失败。失败日志如下:

java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:707) ~[flink-shaded-hadoop-2-2.4.1-9.0.jar!/:2.4.1-9.0]
        at org.apache.hadoop.hdfs.DFSClient.setPermission(DFSClient.java:2127) ~[flink-shaded-hadoop-2-2.4.1-9.0.jar!/:2.4.1-9.0]
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1266) ~[flink-shaded-hadoop-2-2.4.1-9.0.jar!/:2.4.1-9.0]
        at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1262) ~[flink-shaded-hadoop-2-2.4.1-9.0.jar!/:2.4.1-9.0]
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-2.4.1-9.0.jar!/:2.4.1-9.0]
        at org.apache.hadoop.hdfs.DistributedFileSystem.setPermission(DistributedFileSystem.java:1262) ~[flink-shaded-hadoop-2-2.4.1-9.0.jar!/:2.4.1-9.0]
        at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:838) ~[flink-yarn_2.11-1.10.1.jar!/:1.10.1]
        at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:488) ~[flink-yarn_2.11-1.10.1.jar!/:1.10.1]
        at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:391) ~[flink-yarn_2.11-1.10.1.jar!/:1.10.1]
        at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) ~[flink-clients_2.11-1.10.1.jar!/:1.10.1]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1733) ~[flink-streaming-java_2.11-1.10.1.jar!/:1.10.1]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1634) ~[flink-streaming-java_2.11-1.10.1.jar!/:1.10.1]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) ~[flink-streaming-java_2.11-1.10.1.jar!/:1.10.1]
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602) ~[flink-streaming-java_2.11-1.10.1.jar!/:1.10.1]
        at com.wdzaslzy.flink.controller.FlinkController.run(FlinkController.java:51) ~[classes!/:1.0-SNAPSHOT]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:888) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:634) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:108) ~[spring-boot-actuator-2.2.2.RELEASE.jar!/:2.2.2.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.2.2.RELEASE.jar!/:5.2.2.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:526) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:367) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:860) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1591) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_191]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_191]
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.29.jar!/:9.0.29]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]

源码追踪

根据日志提供的信息,在 DFSClient下有一个checkOpen()方法。

void checkOpen() throws IOException {
    if (!this.clientRunning) {
        IOException result = new IOException("Filesystem closed");
        throw result;
    }
}

可以看到,在clientRunning为false时,会抛出该异常。在该文件中搜索clientRunning的更新情况,发现在close()方法中会将clientRunning设置为false。

public synchronized void close() throws IOException {
    if (this.clientRunning) {
        this.closeAllFilesBeingWritten(false);
        this.clientRunning = false;
        this.getLeaseRenewer().closeClient(this);
        this.closeConnectionToNamenode();
    }
}

此时猜想,两个线程并发时,A线程关闭,导致B线程也不能正常运行。那么,调用close方法的入口就尤为重要。

通过断点跟踪,发现在YarnClusterDescriptor.startAppMaster()中调用了。

final FileSystem fs = FileSystem.get(yarnConfiguration);
//部分代码省略
fs.close();

到此处,因此可以猜测,在多线程环境中,所创建出来的FileSystem是同一个时,才有可能互相影响。

于是跟踪FileSystem.get()方法时看到:

public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();
    String authority = uri.getAuthority();
    if (scheme == null && authority == null) {
        return get(conf);
    } else {
        if (scheme != null && authority == null) {
            URI defaultUri = getDefaultUri(conf);
            if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
                return get(defaultUri, conf);
            }
        }
        //核心代码
        String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
        return conf.getBoolean(disableCacheName, false) ? createFileSystem(uri, conf) : CACHE.get(uri, conf);
    }
}

从核心代码处可以看出来,在开启cache的情况下时,优先从CACHE中获取FileSystem。这样就导致,在多线程环境下,每个线程所创建的FileSystem其实是同一个实例。其中一个线程关闭了FileSystem,也会导致其他线程不能正常执行。

由于它是可以配置的,因此,只需要在hdfs-site.xml中关闭cache即可。

<property>
    <name>fs.hdfs.impl.disable.cache</name>
    <value>true</value>
</property>

PS:还记得前几篇文章中,需要将 hdfs-site.xml 文件加入到 Resource中吗?

该配置是客户端配置,而非HDFS的全局配置。