Flink Checkpoint 运行机制调研

/ flink / 180浏览

背景

Flink任务中启用了checkpoint,其中,状态后端为:

StateBackend stateBackend = new RocksDBStateBackend("file:///opt/flink/checkpoint", false);

Flink集群是Standalone集群。结构如下:

JobManager:(host1)

TaskManager-0:(host2)

TaskManager-1:(host3)

Flink任务启动后,被分发到TaskManager-0中运行。可以在host2中看到checkpoint目录:

/opt/flink/checkpoint/
    --chk-0
    --chk-1
    --chk-x

在host1中可以看到checkpoint目录:

/opt/flink/checkpoint/
    --shared
    --taskowned

在host3中未看到checkpoint目录。

当TaskManager-0宕机后,flink任务重新启动,被调度到TaskManager-1中执行。执行时会报如下错误:

2020-07-20 10:41:51
java.lang.Exception: Exception while creating StreamOperatorStateContext.
  at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
  at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_0fbdcee0106dabd750aa5035ed33c1f9_(1/1) from any of the 1 provided restore options.
  at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
  at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252)
  at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
  ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
  at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565)
  at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243)
  at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
  at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
  ... 11 more
Caused by: java.io.FileNotFoundException: /opt/flink/data/huazhi_analysis/ca8097a6b12c714317854de13277c314/chk-3/a28cff4c-0b82-41bd-9427-7e1d364689a9 (No such file or directory)
  at java.io.FileInputStream.open0(Native Method)
  at java.io.FileInputStream.open(FileInputStream.java:195)
  at java.io.FileInputStream.<init>(FileInputStream.java:138)
  at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
  at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
  at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
  at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
  at org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
  at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:73)
  at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
  ... 15 more

意思是,找不到checkpoint文件。

以此为背景,开始调研Flink Checkpoint机制。

Checkpoint流程跟踪

设置状态后端

StateBackend stateBackend = new RocksDBStateBackend("file:///opt/flink/checkpoint", false);

表面上看是RocksDB状态后端,实际上,checkpoint时,使用的是FsStateBackend。从它的构造器中可以看出。

public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing)         throws IOException {
    this(new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing);
}

它们之间的联系在于:

RocksDBStateBackend,在两次checkpoint时间间隔中,产生的状态值,保存在RocksDB中。

FsStateBackend,checkpoint触发时,从RocksDB中拉取状态值,保存在文件系统中。

JobManager中触发Checkpoint

Flink执行过程中会构建ExecutionGraph。在ExecutionGraph中有一个enableCheckpointing方法,来启用checkpoint机制。

public void enableCheckpointing() {
    checkpointCoordinator = new CheckpointCoordinator(
            jobInformation.getJobId(),
            chkConfig,
            tasksToTrigger,
            tasksToWaitFor,
            tasksToCommitTo,
            checkpointIDCounter,
            checkpointStore,
            checkpointStateBackend,
            ioExecutor,
            new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
            SharedStateRegistry.DEFAULT_FACTORY,
            failureManager);

    // 注册一个Checkpoint监听器
    if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
        registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
    }
}

该方法中有两个核心逻辑。第一个是初始化CheckpointCoordinator,第二个是注册监听器。

初始化CheckpointCoordinator

在初始化CheckpointCoordinator时,会先在其构造器中创建checkpoint目录结构。

this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
checkpointStorage.initializeBaseLocations();

CheckpointCoordinator是一个核心类,触发Checkpoint、完成Checkpoint、停止Checkpoint等机制都在它里面运行。它是运行在JobManager中。

注册CheckpointCoordinatorDeActivator

这个监听器主要是用来侦听JobStatus中的更改,并启用或停用定期检查点调度程序。

核心代码如下:

public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
    if (newJobStatus == JobStatus.RUNNING) {
        // start the checkpoint scheduler
        // coordinator 为上面初始化的 CheckpointCoordinator
        coordinator.startCheckpointScheduler();
    } else {
        // anything else should stop the trigger for now
        coordinator.stopCheckpointScheduler();
    }
}
向SourceTask发送checkpoint请求

启用checkpointScheduler后,会定时调用ScheduledTrigger来向各个SourceTask发送checkpoint请求。

ScheduledTrigger是一个实现了runnable接口的类,run方法里面调用了CheckpointCoordinator.triggerCheckpoint()方法。该方法中主要做checkpoint触发机制。

核心代码如下:

public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
    long timestamp,
    CheckpointProperties props,
    @Nullable String externalSavepointLocation,
    boolean isPeriodic,
    boolean advanceToEndOfTime) throws CheckpointException {

    if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
        throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
    }

    // make some eager pre-checks
    synchronized (lock) {
        preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
    }

    // check if all tasks that we need to trigger are running.
    // if not, abort the checkpoint
    Execution[] executions = new Execution[tasksToTrigger.length];
    for (int i = 0; i < tasksToTrigger.length; i++) {
        Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
        if (ee == null) {
            LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                     tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                     job);
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        } else if (ee.getState() == ExecutionState.RUNNING) {
            executions[i] = ee;
        } else {
            LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
                     tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                     job,
                     ExecutionState.RUNNING,
                     ee.getState());
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        }
    }

    // next, check if all tasks that need to acknowledge the checkpoint are running.
    // if not, abort the checkpoint
    Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

    for (ExecutionVertex ev : tasksToWaitFor) {
        Execution ee = ev.getCurrentExecutionAttempt();
        if (ee != null) {
            ackTasks.put(ee.getAttemptId(), ev);
        } else {
            LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                     ev.getTaskNameWithSubtaskIndex(),
                     job);
            throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
        }
    }

    // we will actually trigger this checkpoint!

    final CheckpointStorageLocation checkpointStorageLocation;
    final long checkpointID;

    try {
        // this must happen outside the coordinator-wide lock, because it communicates
        // with external services (in HA mode) and may block for a while.
        checkpointID = checkpointIdCounter.getAndIncrement();

        checkpointStorageLocation = props.isSavepoint() ?
            checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
        checkpointStorage.initializeLocationForCheckpoint(checkpointID);
    }
    catch (Throwable t) {
        int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
        LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
                 job,
                 numUnsuccessful,
                 t);
        throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
    }

    final PendingCheckpoint checkpoint = new PendingCheckpoint(
        job,
        checkpointID,
        timestamp,
        ackTasks,
        masterHooks.keySet(),
        props,
        checkpointStorageLocation,
        executor);

    if (statsTracker != null) {
        PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
            checkpointID,
            timestamp,
            props);

        checkpoint.setStatsCallback(callback);
    }

    // schedule the timer that will clean up the expired checkpoints
    final Runnable canceller = () -> {
        synchronized (lock) {
            // only do the work if the checkpoint is not discarded anyways
            // note that checkpoint completion discards the pending checkpoint object
            if (!checkpoint.isDiscarded()) {
                LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);

                failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
                pendingCheckpoints.remove(checkpointID);
                rememberRecentCheckpointId(checkpointID);

                triggerQueuedRequests();
            }
        }
    };

    try {
        // re-acquire the coordinator-wide lock
        synchronized (lock) {
            preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());

            LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);

            pendingCheckpoints.put(checkpointID, checkpoint);

            ScheduledFuture<?> cancellerHandle = timer.schedule(
                canceller,
                checkpointTimeout, TimeUnit.MILLISECONDS);

            if (!checkpoint.setCancellerHandle(cancellerHandle)) {
                // checkpoint is already disposed!
                cancellerHandle.cancel(false);
            }

            // TODO, asynchronously snapshots master hook without waiting here
            for (MasterTriggerRestoreHook<?> masterHook : masterHooks.values()) {
                final MasterState masterState =
                    MasterHooks.triggerHook(masterHook, checkpointID, timestamp, executor)
                    .get(checkpointTimeout, TimeUnit.MILLISECONDS);
                checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
            }
            Preconditions.checkState(checkpoint.areMasterStatesFullyAcknowledged());
        }
        // end of lock scope

        final CheckpointOptions checkpointOptions = new CheckpointOptions(
            props.getCheckpointType(),
            checkpointStorageLocation.getLocationReference());

        // send the messages to the tasks that trigger their checkpoint
        for (Execution execution: executions) {
            if (props.isSynchronous()) {
                execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
            } else {
                execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
            }
        }

        numUnsuccessfulCheckpointsTriggers.set(0);
        return checkpoint.getCompletionFuture();
    }
    catch (Throwable t) {
        // guard the map against concurrent modifications
        synchronized (lock) {
            pendingCheckpoints.remove(checkpointID);
        }

        int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
        LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
                 checkpointID, job, numUnsuccessful, t);

        if (!checkpoint.isDiscarded()) {
            failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
        }

        try {
            checkpointStorageLocation.disposeOnFailure();
        }
        catch (Throwable t2) {
            LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
        }

        // rethrow the CheckpointException directly.
        if (t instanceof CheckpointException) {
            throw (CheckpointException) t;
        }
        throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
    }
}

其主要步骤如下:

  1. 找出所有需要发送checkpoint消息的task

    tasksToTrigger,生成JobGraph时生成,将这些task放入到executions中。(后面会send messages)

  2. 找出需要返回checkpoint信息的task

    ackTasks

  3. 根据checkpointId创建checkpoint目录

    在checkpoint目录下创建,形式如:chk-x

  4. 注册取消器

    如果超过checkpointTImeout的时间内,还没有结束,则取消checkpoint

  5. 向各个Task发送checkpoint消息

    send the messages to the tasks that trigger their checkpoint

    循环第一步找到的所有Task,调用:

    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);

以上逻辑,均在JobManager中执行。向各个SourceTask发送checkpoint消息后,运行在TaskManager中的SourceTask开始进行checkpoint。

注:SourceTask表示执行SourceFunction逻辑的Task任务。

TaskManager中执行checkpoint

在JobMaster发送Checkpoint之后,对于SourceTask来讲,其运行在TaskManager进程中,其对checkpoint消息的接收通过TaskManager的akka服务完成,由flink-akka.actor.default-dispatcher-*线程处理,该线程池由TaskManger启动时创建,该处理会调用至Task的triggerCheckpointBarrier来完成checkpoint

调用方法栈如下图:

image-20200721171700171

最终处理是在StreamTask的performCheckpoint方法中。核心代码如下:

private boolean performCheckpoint(
    CheckpointMetaData checkpointMetaData,
    CheckpointOptions checkpointOptions,
    CheckpointMetrics checkpointMetrics,
    boolean advanceToEndOfTime) throws Exception {

    LOG.debug("Starting checkpoint ({}) {} on task {}",
              checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

    final long checkpointId = checkpointMetaData.getCheckpointId();

    if (isRunning) {
        actionExecutor.runThrowing(() -> {

            if (checkpointOptions.getCheckpointType().isSynchronous()) {
                setSynchronousSavepointId(checkpointId);

                if (advanceToEndOfTime) {
                    advanceToEndOfEventTime();
                }
            }

            // All of the following steps happen as an atomic step from the perspective of barriers and
            // records/watermarks/timers/callbacks.
            // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
            // checkpoint alignments

            // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
            //           The pre-barrier work should be nothing or minimal in the common case.
            operatorChain.prepareSnapshotPreBarrier(checkpointId);

            // Step (2): Send the checkpoint barrier downstream
            operatorChain.broadcastCheckpointBarrier(
                checkpointId,
                checkpointMetaData.getTimestamp(),
                checkpointOptions);

            // Step (3): Take the state snapshot. This should be largely asynchronous, to not
            //           impact progress of the streaming topology
            checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

        });

        return true;
    } else {
        actionExecutor.runThrowing(() -> {
            // we cannot perform our checkpoint - let the downstream operators know that they
            // should not wait for any input from this operator

            // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
            // yet be created
            final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            recordWriter.broadcastEvent(message);
        });

        return false;
    }
}

期中核心的步骤为第二步和第三步。

执行checkpoint

执行checkpoint时,主要是调用task的各个算子进行checkpoint

checkpointingOperation.executeCheckpointing();
public void executeCheckpointing() throws Exception {
    startSyncPartNano = System.nanoTime();

    try {
        for (StreamOperator<?> op : allOperators) {
            //调用各个算子的checkpoint,同步checkpoint
            checkpointStreamOperator(op);
        }

        if (LOG.isDebugEnabled()) {
            LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                      checkpointMetaData.getCheckpointId(), owner.getName());
        }

        startAsyncPartNano = System.nanoTime();

        checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

        // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
        AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
            owner,
            operatorSnapshotsInProgress,
            checkpointMetaData,
            checkpointMetrics,
            startAsyncPartNano);

        // 异步checkpoint
        owner.cancelables.registerCloseable(asyncCheckpointRunnable);
        owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

        if (LOG.isDebugEnabled()) {
            LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                      "Alignment duration: {} ms, snapshot duration {} ms",
                      owner.getName(), checkpointMetaData.getCheckpointId(),
                      checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                      checkpointMetrics.getSyncDurationMillis());
        }
    } catch (Exception ex) {
       //...
    }
}

同步checkpoint

对每个算子进行checkpoint,各个算子的checkpoint实现都是一样,在AbstractStreamOperator的snapshotState方法中。

public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
                                                   CheckpointStreamFactory factory) throws Exception {

    KeyGroupRange keyGroupRange = null != keyedStateBackend ?
        keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

    OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

    StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
        checkpointId,
        timestamp,
        factory,
        keyGroupRange,
        getContainingTask().getCancelables());

    try {
        snapshotState(snapshotContext);

        snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
        snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

        if (null != operatorStateBackend) {
            snapshotInProgress.setOperatorStateManagedFuture(
                operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
        }

        if (null != keyedStateBackend) {
            snapshotInProgress.setKeyedStateManagedFuture(
                keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
        }
    } catch (Exception snapshotException) {
        try {
            snapshotInProgress.cancel();
        } catch (Exception e) {
            snapshotException.addSuppressed(e);
        }

        String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
            getOperatorName() + ".";

        if (!getContainingTask().isCanceled()) {
            LOG.info(snapshotFailMessage, snapshotException);
        }
        try {
            snapshotContext.closeExceptionally();
        } catch (IOException e) {
            snapshotException.addSuppressed(e);
        }
        throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
    }

    return snapshotInProgress;
}

operatorStateBackend.snapshot方法中将状态信息写入snapshotInProgress,等待被写在检查点目录。

异步checkpoint

同步checkpoint完成之后,会将具体的信息写入异步实现,由名为AsyncOperations-thread-* 的线程来执行。

AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
    owner,
    operatorSnapshotsInProgress,
    checkpointMetaData,
    checkpointMetrics,
    startAsyncPartNano);

owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

AsyncCheckpointRunnable.run方法来执行异步写入。该方法主要做了两个操作:

非SourceTask的状态检查

SourceTask是由JobManager发送checkpoint消息触发的,而非SourceTask是由上游的task触发的。其执行逻辑和SourceTask类似。

checkpoint完通知JobManager

在Task完成checkpoint之后,会向JobManager发送AcknowledgeCheckpoint消息,该消息在JobManager侧依然通过CheckpointCoordinator处理,核心代码如下:

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
    case SUCCESS:
        checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo);
        if (checkpoint.areTasksFullyAcknowledged()) {
            completePendingCheckpoint(checkpoint);
        }
        break;
}

核心逻辑是:

总结

综上,flink在RocksDB下执行checkpoint后,由于checkpoint地址为本地文件系统,在TaskManager中进行checkpoint时,会将状态结果写入在本地目录。不会同步在其他的TaskManager目录中。为了能让多个TaskManager共享状态,应该将检查点目录设置为第三方文件系统。

FsStateBackend 支持的第三方文件系统有:hdfs、file、s3、ftp、sftp等。

使用Standalone模式运行的flink,或许和hadoop没有交集,不能直接使用hdfs,但是我们可以选择ftp来进行状态保存。关于ftp服务的搭建,参考其他资料。搭建好ftp后,使用如下方式即可将状态结果保存在ftp中。

new RocksDBStateBackend("ftp://abc:def@192.168.1.xx:21/home/abc/chk",false);

对于不同的文件系统,需要对应的FileSystem来支持。因此,如果使用FTP,需要将FTPFileSystem引入。

引入hadoop-common-3.0.0依赖即可。

image-20200722094017492

参考:

Flink checkpoint流程源码分析