banner
bladedragon

bladedragon

Hadoop CommitProtocol介绍

一些笔记的归档。

Hadoop CommitProtocol#

hadoop commitProtocol(全称 HadoopMapReduceCommitProtocol)是一套用于用于提交文件到文件系统的规则。是 hadoop 抽象出来的,为了实现存算分离,兼容不同底层文件系统的机制。

这里我们假定:job 为某种需要并行计算的查询作业;在大数据计算框架中,这种作业往往由多个 task 组成,以 DAG 的方式触发执行;job 由调度程序 JobManager 调起,并将 task 分发到各个 executor 中执行作业,最后聚合并返回最终结果。

那么我们可以将 commitProtocol 的基本执行逻辑简化成如下方法的集合:

  1. setupJob(): 在执行程序初始化 job,例如创建必要的临时目录等
  2. setupTask(): 在 executor 初始化 task,创建临时目录等
  3. needTaskCommit(): 是否需要提交 task
  4. commitTask(): 提交 task,将 task 产生的临时输出提交到最终的输出路径
  5. abortTask(): 当需要终止 task 时,需要清理 task 产生的临时文件
  6. commitJob(): 当确保所有 task 提交完成后,将提交整个 job 的最终结果
  7. abortJob(): 终止并清理 job

image (1)

commitProtocol 相当于是指导 Mapreduce 作业具体运行在存储系统上的一套标准。考虑到在 MapReduce 作业一般都是大规模的耗时作业,其中难免会存在各种异常导致部分 task 失败。因此 commitProtocol 的实现也需要格外考虑数据正确性和事务一致性。

FileOutputCommitter#

和 HadoopCommitProtol 专注于作业整体的提交流程不同,committer 专注于具体的 commit 操作(taskCommit 和 jobCommit)。在 hadoop 原生实现中,主要实现在 FileOutputCommitter 中,并针对性能和隔离性实现了两种算法

commitV1#

在 commitTask 时,数据文件会从 task 临时目录 rename 到 job 临时目录中,并在 commitJob 时,从 job 临时目录 rename 到正式目录。由于数据在 job 完成前保持良好的隔离性,失败作业可快速从上次临时目录中恢复数据。

commitV2#

在 commitTask 时,数据文件会从 task 临时目录 rename 到 job 的正式目录,从而在 commitJob 时只需要确认数据完整并进行成功标记即可。但是由于 commiTask 阶段的 rename 操作对外可见,因此隔离性不佳,当作业失败时,也无法快速恢复。

Spark CommitProtocol#

commitProtocol 在 Hadoop 的经典实现是 yarn 和 spark 上。由于现在基本上都从 MR 作业转向了 spark 作业,所以这里也只介绍 spark 的 commitProtocol 协议。

spark 的 commitProtocol 实际上在 hadoopCommitProtocol 基础上进行了封装,使其适配 spark 的计算执行模型,同时补齐了 hadoopCommitProtocol 的部分缺陷,提升了扩展性。

除此之外,因为继承了 hadoopCommitProtocol, spark 自然也使用 FileOutputCommitter 用于写入文件。如果想要接入其他 committer,也可以通过继承并重写 commitProtocol 的方式去实现,或者显式指定 committer,这在 spark 中均有配置实现。

image-20231014130948900

其与 hadoopComitProtocol 的差异在于:

  1. 确保了 task attempt 进行 commitTask 时的正确性(OutputCommitCoordinator)

spark 引入了 OutputCommitCoodinator 跟踪每个 stage 里 task attempt 的状态,如果某个 task attempt 成功,后续的所有 attempt 发起的 commitTask 都会被拒绝。通过这种方式保证 commitTask 执行的正确性

image (2)

  1. 支持 hive 外表数据读写和动态分区重写

在 hive 表查询中,数据可能写到内部表,也可能写到外部表。外部表的数据往往存储在不同的位置。为了支持这种需要输出数据到不同路径的操作,spark commitProtocol 允许记录文件的绝对路径 newTaskTempFileAbsPath ().。在读取文件时将文件的绝对路径保存下来,并在写入时获取绝对路径写入。

动态分区重写指的是 spark 在写入数据得时候允许只重写指定分区的数据。避免重写整张表,增大开销。
什么是动态分区:

若 sql 中未指定分区字段的具体值,使得该分区可以在计算中自动推断出,即为动态分区。

例子:

insert overwrite table ${table_name} partition (dayno=${v_day}, hour=${v_dayhour_hh}, region)
select * from ${query_table_name}
where dayno = ${v_day};

开启动态分区重写的条件:

  1. spark.sql.sources.partitionOverwriteMode=dynamic
  2. SaveMode=overwrite
  3. sql 中存在动态分区的字段

动态分区重写的基本逻辑

  1. 如果开启动态分区重写,spark 会将 job 目录下生成的临时数据文件生成在 staging 目录下
  2. 针对外部表的数据路径,在每次 rename 绝对路径的分区文件前,需要先清理并重建其父级目录避免 rename 时不存在(overwrite)
  3. 针对内部表的数据路径,由于 commitJob 后路径仍然在 staging 目录下,所以需要将所有文件和目录从 staging 目录 rename 到正式目录

参考 issue:[SPARK-35106]

存在的问题#

基于之前一篇文章, 当我们通过 Hadoop MapReduce 或者 Spark 将数据写入到 S3 时,就会存在相当严重的的性能问题。

在 s3 中,只提供了以下六种基础操作,所以其他操作只是这几种操作的衍生。例如rename =list + copy + delete. 这导致 rename 操作是非原子性的。传统文件 rename 操作需要在 s3 上实现 CREATE + DELETE 操作,目录的 rename 则还需要额外 LIST 所有文件的操作,这导致 rename 操作不是原子的,而且随着文件数的增加,rename 速度会越来越慢。

操作名解释
PUT对象的原子写操作
GET获取对象
HEAD获取对象的元数据
LIST基于前缀列出所有对象
COPY复制单个对象
DELETE删除对象

而 rename 在传统的 commit 算法中是不可或缺的一环,这将直接导致 commit 算法的性能和正确性受到了挑战。

另外,在 spark 中也存在 staging 操作。staging 是 spark 作业中重要的步骤,在动态分区重写的时候,必须要通过 staging 来保障数据的一致性。但是 staging 中同样存在文件的 rename 操作,在 s3 的场景,这样的操作会带来很大的开销。

S3A Committer#

随着在 s3 上处理数据规模的增大,除了上述提到了 6 种基础操作,AWS 还额外提供了两种操作用来应对大规模的数据传输。

  • Bulk Delete
  • Multipart Upload
    后者将是解决 s3 在提交文件性能问题上的关键。

简单介绍一下 Multipart Upload 机制(后面简称 MPU):
multiPartUpload 主要分为三个阶段

  1. 初始化
  2. 上传块
  3. 完成最终 POST 操作

参考 aws 官方文档

在上传过程中,已上传的块在 s3 中是不可见的,但是会占用实际存储。只有当 s3 最后 POST 操作完成后,s3 的 manifest 才会写入该文件,该文件才会在 s3 上可见。

因此。hadoop 的开发人员充分利用了该机制,设计了两种 S3A Committer: Staging CommitterMagic Committer

Staging Committer#

在 stagingCommitter 上,数据在 taskAttempt 时被加载到本地进行计算,并在 commitTask 的时候上传到 s3 上,此时文件被直接 MPU 到最终的 job 目录上,但是文件信息被上传到 HDFS 上进行 传统的 FileOutputCommitter 的 V1 算法操作。commitJob 阶段,最终在 hdfs 上完成最后的 rename 后,执行最终 POST 操作,上传的文件在 s3 上才最终可见

Magic Committer#

在 magicCommitter 上,数据从一开始就允许直接写到 s3 上,不需要落到本地。magicCommitter 会在 job 目录下创建一个_magic 目录,taskAttempt 最后输出的文件都已 magic 文件流的形式 MPU 到这个目录下。注意此时并没有完成 MPU,该次 MPU 的 manifest 会写到文件同名的一个.pending 文件中(同样在_magic 目录下)。在 commitTask 时,由于文件数据已经上
传到 s3 上,只需要加载 taskAttempt 路径下所有.pending 文件,聚合成.pendingset 文件上传。在 commitJob 阶段,则是获取所有.pendingset 文件,这里就已经获取到所有需要 commit 的文件的 manifest,因此依次完成最终 POST 操作即可。


更多详细内容,可以去阅读 Hadoop 中 committer 实现的源码来深入学习。

总的来说,S3A Committer 都是通过延迟文件最后提交操作来避免 rename 操作。基于当前 s3 的版本,两种 committer 基本都能解决性能问题。不过,hadoop 对 committer 的实现更多考虑了通用性,而针对部分特化的业务场景,则没有给到足够的支持。(简单讲就是还无法做到开箱即用)例如在 spark 的动态分区重写机制以及写入 hive 外部表的机制,仍然需要自己实现。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。