【Flink】Flink Checkpoint 流程解析

张开发
2026/4/12 4:45:20 15 分钟阅读

分享文章

【Flink】Flink Checkpoint 流程解析
Flink Checkpoint 流程解析Checkpoint 流程解析Flink Checkpoint 流程解析Checkpint 流程概括Checkpoint 触发流程解析 (Flink 1.20)任务启动后 JobManager 开始定期对任务执行 CheckpointJobManager 使用 CheckpointCoordinator 触发 CheckpointCheckpointCoordinator 初始化 Checkpoint 所需要的信息触发所有 OperatorCoordinator Checkpoint触发 MastersHooks 状态快照CheckpointCoordinator 通知子任务开始 Checkpoint子任务开始触发 CheckpointMailBoxProcessor 异步执行 Checkpoint 事件初始化输入端状态触发 StreamOperator 状态快照下游算子接收到 CheckpointBarrier 后开始 CheckpointCheckpointBarrierHandler 处理 BarrierCheckpint 流程概括任务运行后 JobMaster 定时执行 CheckpointJobMaster 会通过调用 CheckpointCoordinator 对作业进行 Checkpoint。CheckpointCoordinator 开始进行 Checkpoint它首先会先创建 PendingCheckpoint然后开始给 Checkpoint 计时再关闭网关开始触发 OperatorCoordinator 的 Checkpoint。如果是 SourceOperatorCoordinator则这时会调用 Source 的 getSplitSerializer获取分片序列化器然后将 SplitAssignmentTracker 中任务运行时分配的分片序列化创建 Snapshot再将 Snapshot 放入 PendingCheckpoint 中。OperatorCoordinator 状态触发完后开始触发 MasterHooks 状态快照MasterTriggerRestoreHook 由 UDFStreamOperator 内部的实现 WithMasterCheckpointHook 接口的 Function 创建用于在 Master 触发 Checkpoint 时Function 需要进行的操作。MasterHooks 调用完后CheckpointCoordinator 将给子任务 TaskManager 发送请求通知它们开始 Checkpoint。TaskExecutor 获取相应的任务 TaskTask 调用 StreamTask 开始进行 CheckpointStreamTask 调用 Mailbox 执行 Checkpoint 事件Mailbox 执行 Checkpoint 事件时 Source 将不会从数据源读取数据。Checkpoint 事件开始执行如果 Checkpoint 需要强制对齐那么需要异步创建 Channel 和结果分区的数据快照 随后在执行传播 Barrier 前SubtaskCheckpointCoordinatorImpl 会调用 OperatorChain 让 Operator 进行 Barrier 前的准备操作然后开始往下游传播 Barrier。SubtaskCheckpointCoordinatorImpl 创建 CheckpointBarrier 并将 CheckpointBarrier 发送给 RecordWriterOutput 将 Barrier 传输给下游任务然后注册 Barrier 对齐超时计时器。Barrier 传播完后如果之前创建了 Channel 状态快照 那么还需要异步完成 Channel Output 的数据快照。最后 SubtaskCheckpointCoordinatorImpl 开始对当前子任务的所有算子进行 Checkpoint这时会进行算子创建快照时的操作算子状态是存储在 OperatorStateBackend 和 KeyedStateBackend 中的 SubtaskCheckpointCoordinator 将会创建 OperatorStateBackend 和 KeyedStateBackend 的状态快照。下游任务这时是正常处理上游发送过来的数据的但是上游正在进行 Checkpoint数据也是被发送过来的 CheckpointBarrier 分割开了处理到后面会接收到上游的 CheckpointBarrier也就表示着当前 Checkpoint 上游快照数据已经处理完下游也开始进行 Checkpoint 了下游进行 Checkpoint 的过程也是和上面的一样继续调用 SubtaskCheckpointCoordinatorImpl 开始进行 Checkpoint。总的来说Checkpoint 将创建 Coordinator 状态、托管键值状态、托管算子状态、未处理的键值状态、未处理的算子状态、输入通道状态和结果分区状态的快照。Checkpoint 触发流程解析 (Flink 1.20)任务启动后 JobManager 开始定期对任务执行 CheckpointTask 任务恢复Task#restoreAndInvoke…更新任务状态为 RUNNING 状态TaskExecutor 通知 JobMaster 任务状态更新TaskManagerActions#updateTaskExecutionStateTaskExecutor.TaskManagerActionsImpl#updateTaskExecutionStateJobMasterGateway#updateTaskExecutionState…JobMaster 调用 SchedulerBase、DefaultExecutionGraph 更新任务状态定期触发 CheckpointJobMaster#updateTaskExecutionStateSchedulerBase#updateTaskExecutionStateDefaultExecutionGraph#updateStateDefaultExecutionGraph#updateStateInternal[CheckpointCoordinator 开始定期执行 Checkpoint](#JobManager 使用 CheckpointCoordinator 触发 Checkpoint)CheckpointCoordinator#startCheckpointSchedulerJobManager 使用 CheckpointCoordinator 触发 CheckpointJobMaster 触发 CheckpointJobMaster#triggerCheckpoint调度器触发 CheckpointSchedulerNG#triggerCheckpoint从 ExecutionGraph 中获取 CheckpointCoordinator创建 CheckpointTriggerRequest并使用 CheckpointCoordinator 通过 CheckpointRequestDecider 决定需要处理的 Checkpoint 请求触发 CheckpointCheckpointCoordinator#triggerCheckpointCheckpointRequestDecider#chooseRequestToExecuteCheckpointCoordinator#startTriggeringCheckpointCheckpointCoordinator 初始化 Checkpoint 所需要的信息触发和通知所有 OperatorCoordinator 开始 CheckpointOperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion触发 MasterHooks 状态快照CheckpointCoordinator#snapshotMasterStateMasterTriggerRestoreHook#triggerCheckpointCheckpointCoordinator 通知子任务开始 CheckpointCheckpointCoordinator 初始化 Checkpoint 所需要的信息CheckpointCoordinator 初始化 Checkpoint 所需要的信息计算 Checkpoint 执行计划CheckpointPlanCalculator#calculateCheckpointPlan校验所有任务是否已经初始化如果有任务已经完成那么创建所有任务完成后计算检查点的计划DefaultCheckpointPlanCalculator#calculateAfterTasksFinished如果没有任务完成那么创建当所有任务都在运行时计算检查点的计划该计划为所有任务都将标记为需要触发 Checkpoint并将所有任务标记为需要等待和提交DefaultCheckpointPlanCalculator#calculateWithAllTasksRunning校验所有任务是否都在运行中Checkpoint 计数加一创建待处理的的 CheckpointCheckpointCoordinator#createPendingCheckpoint追溯待处理的 Checkpoint 状态CheckpointCoordinator#trackPendingCheckpointStats创建一个新的挂起检查点跟踪器CheckpointStatsTracker#reportPendingCheckpoint报告单个子任务的统计信息CheckpointCoordinator#reportFinishedTasks创建待处理的的 CheckpointPendingCheckpoint开始 Checkpoint 计时时间超时则取消 Checkpoint返回待处理的的 Checkpoint初始化 Checkpoint 地址CheckpointCoordinator#initializeCheckpointLocation如果该 Checkpoint 类型为 Savepoint则初始化 Savepoint 地址CheckpointStorageCoordinatorView#initializeLocationForSavepoint否则先初始化 Checkpoint Base 地址再开始初始化地址CheckpointStorageCoordinatorView#initializeBaseLocationsForCheckpointCheckpointStorageCoordinatorView#initializeLocationForCheckpoint返回 Checkpoint 地址触发所有 OperatorCoordinator Checkpoint触发和通知所有 OperatorCoordinator 开始 CheckpointOperatorCoordinatorCheckpoints#triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion触发 OperatorCoordinator CheckpointOperatorCoordinatorCheckpoints#triggerAllCoordinatorCheckpoints关闭网关获取并等待所有事件完成OperatorCoordinatorHolder#closeGatewaysIncompleteFuturesTracker#getCurrentIncompleteAndReset网关标记当前的 CheckpointOperatorCoordinator 触发 CheckpointOperatorCoordinator#checkpointCoordinator根据 Coordinator 的 Checkpoint 后的状态创建并返回 CoordinatorSnapshot通知所有 CheckpointCoordinator Checkpoint 结果OperatorCoordinatorCheckpoints#acknowledgeAllCoordinators触发 MastersHooks 状态快照触发 MasterHooks 状态快照MasterTriggerRestoreHook 由 UDFOperator 内部的实现 WithMasterCheckpointHook 接口的 UDF 创建表示在 Master 触发 Checkpoint 时UDF 可以做什么。CheckpointCoordinator#snapshotMasterStateMasterTriggerRestoreHook#triggerCheckpointCheckpointCoordinator 通知子任务开始 CheckpointCheckpointCoordinator 给子任务发送 Checkpoint 请求CheckpointCoordinator#triggerCheckpointRequest发送任务 Checkpoint 请求CheckpointCoordinator#triggerTasks向所有的 Exeuction 对应的 Taskmanager 网关发送 Checkpoint 请求子任务接收到请求后会开始触发 CheckpointExecution#triggerCheckpointHelperTaskManagerGateway#triggerCheckpoint任务 Checkpoint 请求发送完后取消定时器子任务开始触发 CheckpointTaskManager 触发指定子任务的 CheckpointTaskExecutor#triggerCheckpointTask#triggerCheckpointBarrier创建 Checkpoint 元数据 CheckpointMetaData算子 Mailbox 异步执行 Checkpoint因为 Checkpoint 在 MailboxProcessor 执行所以这时将不会有数据传入CheckpointableTask#triggerCheckpointAsyncStreamTask#triggerCheckpointAsync如果 InputGateway 分区数据未处理完成则触发未完成的数据通道 CheckpointStreamTask#triggerUnfinishedChannelsCheckpoint这这情况是考虑已完成任务的 Checkpoint 如果非 Source 任务成为新的主任务则可能会通过 RPC 触发检查点。在这种情况下他们将通知该检查点的 CheckpointBarrierHandle。创建一个 CheckpointBarrier并通知所有未完成的 Channel 处理该 Barrier并尝试触发 CheckpointCheckpointBarrierHandler#processBarrier如果 InputGateway 分区数据已经处理完成则直接开始触发 CheckpointStreamTask#triggerCheckpointAsyncInMailbox初始化输入端 Channel 状态SubtaskCheckpointCoordinator#initInputsCheckpointSubtaskCheckpointCoordinatorImpl 开始执行 CheckpointStreamTask#performCheckpointSubtaskCheckpointCoordinator#checkpointStateMailBoxProcessor 异步执行 Checkpoint 事件算子调用 SubtaskCheckpointCoordinator 执行 CheckpointSubtaskCheckpointCoordinatorImpl#checkpointState如果当前 Checkpint 被终止了那么向下游发送 CancelCheckpointMarker事件以防下游背压并结束当前 Checkpoint。如果 Checkpoint 之前没有对齐过并且 Checkpoint 配置的对齐类型是强制对齐那么首先将当前 Checkpoint 类型设置为不再需要对齐了然后初始化输入端的状态可见初始化输入端状态CheckpointOptions#withUnalignedSupportedSubtaskCheckpointCoordinatorImpl#initInputsCheckpoint准备 Checkpoint算子执行 Snapshot 和 发送 Barrier 前的操作OperatorChain#prepareSnapshotPreBarrier创建 CheckpointBarrier并往下游发送 CheckpointBarrier 事件开始 Barrier 对齐操作OperatorChain#broadcastEvent注册对齐计时器以在超时时对齐未对齐的 barrierSubtaskCheckpointCoordinator#registerAlignmentTimer如果前面进行了 Channel Checkpoint那么在这里完成状态通道 Writer 快照ChannelStateWriter#finishOutputSubtaskCheckpointCoordinator 同步获取算子的所有的状态快照SubtaskCheckpointCoordinator#takeSnapshotSync如果 Checkpoint 是可超时和可不对齐的则从 ChannelStateWriter 中获取通道状态写结果ChannelStateWriteResult解析 Checkpoint 存储地址SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#resolveCheckpointStorageLocation触发 OpeartorChain 状态快照OperatorChain#snapshotState如果是 RegularOperatorChain则获取所有算子并触发所有算子的状态快照RegularOperatorChain#buildOperatorSnapshotFutures**构建 StreamOpeartor 算子状态快照 Future **StreamOperator#snapshotState如果算子是主算子或者是尾算子那么将通道和结果分区的状态快照结果 Future 设置到AsyncCheckpointRunnable 中如果是FinishedOperatorChain则只将通道和结果分区的状态快照结果 Future 设置到 OperatorSnapshotFutures 中向 CheckpointCoordinator 发送已接收 Checkpoint 事件OperatorChain#sendAcknowledgeCheckpointEvent清理 Checkpoint 缓存SubtaskCheckpointCoordinatorImpl.CachingCheckpointStorageWorkerView#clearCacheFor设置 Checkpoint 持续时间的指标CheckpointMetricsBuilder#setSyncDurationMillis如果获取 SnapShot 成功则异步完成 CheckpointSubtaskCheckpointCoordinator#finishAndReportAsync创建并异步执行 AsyncCheckpointRunnableAsyncCheckpointRunnable#start开始状态快照并等待所有 SnapshotFuture 完成AsyncCheckpointRunnable#finalizedFinishedSnapshotsAsyncCheckpointRunnable#finalizeNonFinishedSnapshots计算 Channel 和分区对齐时的状态大小并设置相关指标否则清理 SubtaskCheckpointCoordinatorSubtaskCheckpointCoordinator#cleanup初始化输入端状态子任务初始化 CheckpointSubtaskCheckpointCoordinatorImpl#initInputsCheckpoint如果 Checkpoint 可不需要对齐初始化写状态通道ChannelStateWriter#start创建CheckpointStartRequest并将请求分发到 WriterChannelStateWriteRequestDispatcher#dispatch分发器处理 CheckpointStartRequestChannelStateWriteRequestDispatcherImpl#handleCheckpointStartRequest为该子任务 Writer 注册 ChannelStateWriteResult用于收集 Checkpoint 过程中传输过来的数据ChannelStateCheckpointWriter#registerSubtaskResult准备正在传输中的数据快照等待输入端的数据达到 BarrierSubtaskCheckpointCoordinatorImpl#prepareInflightDataSnapshot准备输入端快照StreamTask#prepareInputSnapshotStreamTaskInput#prepareSnapshot网络输入端准备快照StreamTaskNetworkInput#prepareSnapshot获取所有还未处理的 Buffer并添加到状态写状态通道中ChannelStateWriter#addInputData返回所有 Barriers 屏障接受 Future等所有 Barriers 屏障接受后完成对给定检查点 id 的通道状态数据的写入将 CheckpointInProgressRequest 请求提交到通道状态写请求执行器(ChannelStateWriteRequestExecutor)中通道状态写请求执行器执行对应请求ChannelStateCheckpointWriter#completeInput完成状态写入写入的状态存放在 ChannelStateWriteResult 中里面存放着写入的状态柄 InputChannelStateHandle 和 ResultSubpartitionStateHandleChannelStateCheckpointWriter#finishWriteAndResult如果 Checkpoint 是可超时的那么除了上面准备输入端快照那一步骤外其他步骤都需要进行触发 StreamOperator 状态快照触发 StreamOperator 的 CheckpointRegularOperatorChain#checkpointStreamOperatorStreamOperatorStateHandler 创建快照StreamOperatorStateHandler#snapshotState创建算子快照环境和算子快照 Futures真正的触发算子快照该步操作可以通过算子自定义StreamOperatorStateHandler.CheckpointedStreamOperator#snapshotState算子和 Keyd 状态后端触发快照Snapshotable#snapshot下游算子接收到 CheckpointBarrier 后开始 Checkpoint下游算子处理上游发送过来的事件CheckpointedInputGate#handleEvent如果接收到的事件为 CheckpointBarrier 事件则开始处理 Barrier尝试开始 CheckpointCheckpointBarrierHandler#processBarrierCheckpointBarrierHandler 处理 BarrierCheckpointBarrierHandler 处理 BarrierCheckpointBarrierHandler#processBarrier如果该 Barrier Id 大于上一次 PendingCheckpoint 的 Id 并且当前开启的 Channel 只有一个标记对齐开始和结束并通知开始 Checkpoint然后结束该次处理CheckpointBarrierHandler#markAlignmentStartAndEndCheckpointBarrierHandler#notifyCheckpointStreamTask#triggerCheckpointOnBarrierSubtaskCheckpointCoordinator#checkpointState否则尝试从等待的 Checkpoint 队列中寻找该 CheckpointBarrier如果找到了则说明 Barrier 已经对齐标记已经完成对齐并开始触发 Checkpoint可见[MailBoxProcessor 异步执行 Checkpoint 事件](#MailBoxProcessor 异步执行 Checkpoint 事件)CheckpointBarrierTracker#triggerCheckpointOnAlignedCheckpointBarrierHandler#notifyCheckpointStreamTask#triggerCheckpointOnBarrierSubtaskCheckpointCoordinator#checkpointState否则将该 Barrier 添加到Checkpoint 队列中开始对齐参考Flink Stateful Stream Processinghttps://nightlies.apache.org/flink/flink-docs-release-1.20/docs/concepts/stateful-stream-processing/

更多文章