Spark concurrent write to same HDFS path

The problem

Sometimes you need to run such a scenario when several Spark tasks write data along the same path to HDFS.

During the execution of tasks, you may encounter some errors:

org.apache.spark.sql.execution.datasources.FileFormatWriter  - Aborting job null.
java.io.IOException: Failed to rename FileStatus{path=hdfs://.../_temporary/0/task_20211021124815_0009_m_000000
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:463)
org.apache.spark.sql.execution.datasources.FileFormatWriter  - Aborting job null.
java.io.FileNotFoundException: File hdfs://.../_temporary/0 does not exist.
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:748)
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /…/_temporary/0/_temporary/attempt_20211021124815_0013_m_000000_0/… (inode 1231231231): File does not exist. Holder DFSClient_attempt_20211021124815_0013_m_000000_0_-1231231231_32 does not have any open files.

Suppose we have one Spark task, that writes to the hdfs://data/test directory.

At runtime, Spark will make a temporary directory: hdfs://data/test/_temporary/0.

There is the path, where our task will be temporarily place the data. After the task finishes writing to the temporary directory, this data will automatically move from the temporary directory hdfs://data/test/_temporary/0 to the target hdfs://data/test.

Now let’s imagine we have 3 tasks: task1, task2 and task3. Each of them should eventually write data to hdfs://data/test.

We run our three tasks task1, task2 and task3. Next, Spark will be create a single temporary directory hdfs://data/test/_temporary/0 for our tasks.

Let’s say task2 ran faster than the others. It will transfer all data from the temporary directory hdfs://data/test/_temporary/0 to the target hdfs://data/test, and then delete temporary directory.

Thus, the temporary directory for task1 and task3 will disappear (because it is common for all tasks), and the errors (which are represented at the beginning of the article) will appear.

Solutions

Approach one

Write not to one HDFS directory (from the example above: hdfs://data/test), but to different ones: hdfs://data/test/0, hdfs://data/test/1, hdfs://data/test/2 if we have running simultaneously 3 tasks.

After completing all tasks, you need to move data from subfolders 0, 1 and 2 to hdfs://data/test.

Approach two

Write your own implementation of OutputCommiter. Details in the article: https://mchesnavsky.tech/spark-custom-parquet-outputcommiter/.

If you still have any questions, feel free to ask me in the comments under this article, or write me on promark33@gmail.com.

If I saved your day, you can support me :)

Leave a Reply

Your email address will not be published.