Archiving some notes.
Hadoop CommitProtocol#
Hadoop CommitProtocol (full name HadoopMapReduceCommitProtocol) is a set of rules for submitting files to the file system. It is abstracted by Hadoop to achieve storage and computation separation and to be compatible with different underlying file systems.
Here, we assume that the job is a query job that requires parallel computation. In big data computing frameworks, such jobs are often composed of multiple tasks triggered and executed in a DAG manner. The job is initiated by the scheduler JobManager and the tasks are distributed to various executors for execution. Finally, the results are aggregated and returned.
We can simplify the basic execution logic of the commitProtocol into the following methods:
setupJob()
: Initializes the job, such as creating necessary temporary directories.setupTask()
: Initializes the task in the executor, creates temporary directories, etc.needTaskCommit()
: Determines whether the task needs to be committed.commitTask()
: Commits the task by submitting the temporary output generated by the task to the final output path.abortTask()
: Cleans up the temporary files generated by the task when it needs to be aborted.commitJob()
: Commits the entire job's final result after ensuring that all tasks have been submitted.abortJob()
: Aborts and cleans up the job.
commitProtocol serves as a set of standards for running MapReduce jobs on storage systems. Considering that MapReduce jobs are generally large-scale and time-consuming, there may be various exceptions causing some tasks to fail. Therefore, the implementation of commitProtocol also needs to consider data correctness and transaction consistency.
FileOutputCommitter#
Unlike HadoopCommitProtocol, FileOutputCommitter focuses on the specific commit operations (taskCommit and jobCommit). In the native implementation of Hadoop, it is mainly implemented in FileOutputCommitter and two algorithms are implemented for performance and isolation purposes.
commitV1#
During commitTask, the data files are renamed from the task's temporary directory to the job's temporary directory, and during commitJob, they are renamed from the job's temporary directory to the final directory. Since the data is well isolated until the job is completed, failed jobs can quickly recover data from the last temporary directory.
commitV2#
During commitTask, the data files are renamed from the task's temporary directory to the job's final directory, so during commitJob, only data integrity needs to be confirmed and marked as successful. However, since the rename operation in the commiTask phase is visible externally, the isolation is not good, and when the job fails, it cannot be quickly recovered.
Spark CommitProtocol#
commitProtocol has been implemented in Hadoop's classic implementation, such as yarn and spark. Since most jobs have transitioned from MR jobs to spark jobs, here we will only introduce the spark commitProtocol protocol.
The spark commitProtocol is actually a wrapper based on the hadoopCommitProtocol, which adapts it to the spark computing and execution model and addresses some of the shortcomings of the hadoopCommitProtocol, improving scalability.
In addition, because it inherits from hadoopCommitProtocol, spark naturally uses FileOutputCommitter for writing files. If you want to integrate other committers, you can also implement it by inheriting and overriding the commitProtocol, or explicitly specify the committer, both of which are configured in spark.
The differences between spark commitProtocol and hadoopCommitProtocol are:
- Ensuring the correctness of task attempt during commitTask (OutputCommitCoordinator)
Spark introduces OutputCommitCoodinator to track the status of each task attempt in each stage. If a task attempt is successful, all subsequent commitTask attempts will be rejected. This ensures the correctness of commitTask execution.
- Supporting reading and writing of hive external table data and dynamic partition rewriting
In hive table queries, data can be written to internal tables or external tables. The data of external tables is often stored in different locations. To support operations that output data to different paths, the spark commitProtocol allows recording the absolute path of the file using the newTaskTempFileAbsPath() method. The absolute path of the file is saved when reading the file, and the absolute path is obtained and written when writing.
Dynamic partition rewriting refers to allowing spark to rewrite only specified partitions of data when writing, avoiding rewriting the entire table and increasing overhead.
What is dynamic partitioning:
Dynamic partitioning refers to the situation where the specific values of partition fields are not specified in the SQL, allowing the partition to be automatically inferred during computation.
Example:
insert overwrite table ${table_name} partition (dayno=${v_day}, hour=${v_dayhour_hh}, region)
select * from ${query_table_name}
where dayno = ${v_day};
Conditions for enabling dynamic partition rewriting:
spark.sql.sources.partitionOverwriteMode=dynamic
SaveMode=overwrite
- Dynamic partition fields exist in the SQL
Basic logic of dynamic partition rewriting:
- If dynamic partition rewriting is enabled, spark will generate temporary data files in the staging directory under the job directory.
- For the data path of external tables, before renaming the absolute path partition file, the parent directory needs to be cleaned and rebuilt to avoid non-existence during rename (overwrite).
- For the data path of internal tables, since the path is still in the staging directory after commitJob, all files and directories need to be renamed from the staging directory to the final directory.
Reference issue: [SPARK-35106]
Existing Issues#
Based on a previous article, when we write data to S3 using Hadoop MapReduce or Spark, there are serious performance issues.
In S3, only six basic operations are provided, so other operations are derived from these six operations. For example, rename = list + copy + delete
. This means that the rename operation is not atomic. The traditional file rename operation requires the implementation of CREATE + DELETE operations on S3, and the rename of directories also requires an additional LIST operation for all files. This makes the rename operation non-atomic and slower as the number of files increases.
Operation | Explanation |
---|---|
PUT | Atomic write operation for objects |
GET | Get an object |
HEAD | Get metadata of an object |
LIST | List all objects based on a prefix |
COPY | Copy a single object |
DELETE | Delete an object |
The rename operation is essential in traditional commit algorithms, which directly affects the performance and correctness of the commit algorithm.
In addition, staging operations also exist in Spark. Staging is an important step in Spark jobs. In dynamic partition rewriting, staging is necessary to ensure data consistency. However, staging also involves file rename operations, which incur significant overhead in the S3 scenario.
S3A Committer#
With the increase in data processing scale on S3, in addition to the six basic operations mentioned above, AWS also provides two additional operations to deal with large-scale data transfers.
- Bulk Delete
- Multipart Upload
The latter is the key to solving the performance issues in file submission on S3.
Let's briefly introduce the Multipart Upload mechanism (referred to as MPU):
Multipart Upload mainly consists of three stages:
- Initialization
- Upload parts
- Complete the final POST operation
Refer to the AWS official documentation for more details.
During the upload process, the uploaded parts are not visible in S3 but occupy actual storage. Only when the final POST operation is completed, the manifest of S3 will be written for the file, and the file will be visible on S3.
Therefore, Hadoop developers fully utilize this mechanism and design two S3A Committers: Staging Committer and Magic Committer.
Staging Committer#
In the stagingCommitter, the data is loaded locally for computation during taskAttempt and uploaded to S3 during commitTask. At this time, the files are directly MPU'd to the final job directory, but the file information is uploaded to HDFS for traditional FileOutputCommitter V1 algorithm operations. During the commitJob phase, the final rename is completed on HDFS, and the final POST operation is executed to make the uploaded files visible on S3.
Magic Committer#
In the magicCommitter, the data is allowed to be written directly to S3 from the beginning, without being landed locally. The magicCommitter creates a _magic
directory under the job directory, and the files output by taskAttempt are MPU'd to this directory in the form of magic file streams. At this time, the MPU is not completed yet, and the manifest of this MPU is written to a .pending
file with the same name as the file (also under the _magic
directory). During commitTask, since the file data has been uploaded to S3, it only needs to load all .pending
files under the taskAttempt path and aggregate them into .pendingset
files for upload. During the commitJob phase, all .pendingset
files are obtained, and the manifests of all files to be committed are obtained. Therefore, the final POST operation can be completed one by one.
For more detailed content, you can read the source code of the committer implementation in Hadoop to learn more.
In summary, S3A Committers avoid rename operations by delaying the final file submission. Based on the current version of S3, both committers can solve the performance issues. However, Hadoop's implementation of committers focuses more on generality, and it does not provide sufficient support for specialized business scenarios. (In other words, it is not yet ready to use out of the box.) For example, the dynamic partition rewriting mechanism in Spark and the writing of data to external tables in Hive still need to be implemented by users themselves.