Flink 模式下的 Checkpoint
Flink 的检查点机制,在流处理中,主要为了实现数据一致性和容错恢复,Checkpoint能够根据配置周期性地基于 Stream 中各个 Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,系统可以在发生故障时回滚,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。
另一个重要的概念:barrier,barrier 将数据流中的记录隔离成一系列的记录集合;barrier被注入到并行流的数据源,注入快照n (称为Sn)的barriers 是数据源中的一个位置,在 kafka的某个分区的最后一条记录的offset。这个位置Sn后续会汇报给JM的checkpoint 协调器。barrier随着流向下游流动,当中间的operator从他所有的输入流中收到checkpoint n 的barrier时,该operator会将barrier发送给他的下游operator。
JobManager -> Source(发送 barrier)-> 初始化 Checkpoint
Source -> 接收 Barrier -> Checkpoint 自己的 state -> 下游发送 Barrier
下游收到 Barrier 后,进行 Barrier Alignment 处理;Task 开始同步阶段的 Snapshot;Task 做完 Checkpoint 之后,再上报 JobManager
默认checkpoint功能是 disabled 的,想要使用的时候需要先启用。checkpoint开启之后,默认的checkPointMode是Exactly-once。
checkPointMode有两种:
- Exactly-once: 数据处理且只被处理一次
- At-least-once:数据至少被处理一次
算法模型下的 Checkpoint
在模型训练过程中,可以添加检查点(CheckPoint)用于保存模型的参数,以便进行推理及中断后再训练使用。
使用场景如下:
- 训练后推理场景
- 模型训练完毕后保存模型的参数,用于推理或预测操作
- 训练过程中,通过实时验证精度,把精度最高的模型参数保存下来,用于预测操作。
再训练场景:
- 进行长时间训练任务时,保存训练过程中的CheckPoint文件,防止任务异常退出后从初始状态开始训练。
Fine-tuning(微调)场景:
训练一个模型并保存参数,基于该模型,面向第二个类似任务进行模型训练。
MindSpore的CheckPoint文件是一个二进制文件,存储了所有训练参数的值。采用了Google的Protocol Buffers机制,与开发语言、平台无关,具有良好的可扩展性。 CheckPoint的protocol格式定义在mindspore/ccsrc/utils/checkpoint.proto中。
以下通过一个示例来介绍MindSpore保存和加载的功能,网络选取ResNet-50,数据集为MNIST。
在模型训练的过程中,使用 callback 机制传入回调函数ModelCheckpoint 对象,可以保存模型参数,生成 CheckPoint 文件。 通过CheckpointConfig对象可以设置CheckPoint的保存策略。 保存的参数分为网络参数和优化器参数。
数据湖版本管理
数据湖是一种存储系统,可以存储任意规模的结构化和非结构化数据。
Delta Lake:是由 Databricks 推出的一种基于 Apache Spark 和 Apache Parquet 的开源数据湖技术。Delta Lake 提供了 ACID 事务、数据版本控制、数据一致性、数据可靠性、数据质量等功能,使数据湖更加健壮和易于管理。Delta Lake 还支持 SQL 查询、Scala/Python/Java API、Spark Streaming、Delta Lake Connectors 等功能。
Apache Hudi:是由 Uber 提出的一种基于 Apache Hadoop 和 Apache Spark 的开源数据湖技术。Hudi 提供了支持增量更新和删除、支持数据版本控制和数据访问控制、支持数据一致性和数据质量控制等功能。Hudi 还支持多种存储格式和存储介质,包括 Parquet、ORC、HDFS、S3、等。
Apache Iceberg:是由 Netflix 提出的一种基于 Apache Hadoop 和 Apache Spark 的开源数据湖技术。Iceberg 提供了快速的数据写入和查询操作、数据版本控制、数据质量控制等功能。Iceberg 还支持多种存储介质和存储格式,包括 Parquet、ORC、Avro、HDFS、S3、等。同时,
Iceberg 支持 SQL 查询、Java API、Spark API 等多种数据访问方式。
优势:
- Iceberg 底层依赖的存储是像 HDFS 或 S3 这样的廉价存储
- Iceberg 是支持 parquet、orc、Avro 这样的列式存储。有列式存储的支持,就可以对 OLAP 分析进行基本的优化,在中间层直接进行计算。例如谓词下推最基本的 OLAP 优化策略,基于 Iceberg snapshot 的 Streaming reader 功能,可以把离线任务天级别到小时级别的延迟大大的降低,改造成一个近实时的数据湖分析系统。
劣势:
- 对于大表 JOIN 不够友好
- Flink 不支持创建带有隐藏分区的 Iceberg 表
- Flink 不支持带有 WaterMark 的 Iceberg 表
- Iceberg 只支持一种表存储模式,就是有 metadata file、manifest file 和 data file 组成存储结构,查询时首先查找 Metadata 元数据进而过滤找到对应的 SnapShot 对应的 manifest files ,再找到对应的数据文件。Hudi 支持两种表存储模式:Copy On Write(写时合并) 和 Merge On Read(读时合并),查询时直接读取对应的快照数据。
- 对于处理小文件合并时,Iceberg 只支持 API 方式手动处理合并小文件,Hudi 对于小文件合并处理可以根据配置自动的执行
- Spark 与 Iceberg 和 Hudi 整合时,Iceberg 对 SparkSQL 的支持目前来看更好。Spark 与 Hudi 整合更多的是 Spark DataFrame API 操作
- 关于 Schema 方面,Iceberg Schema 与计算引擎是解耦的,不依赖任何的计算引擎,而 Hudi 的 Schema 依赖于计算引擎 Schema。