Writing files with PySpark can be confusing at first. Generating a single output file from your dataframe (with a name of your choice) can be surprisingly challenging and is not the default behaviour.
This blog explains how to save the output of a PySpark DataFrame to a single, neatly organized file with a name of your choice and in an efficient manner.
💻 All code snippets in this post are available in the engineeringfordatascience GitHub repo
PySpark’s default behaviour when writing files
When you call PySpark’s ‘write’ method, your dataframe will not be written to a single file.
Instead, it is saved to a new directory, inside of which will be your data but split across multiple files – one for each partition. Additionally, these files in the directory are all given a random name starting with ‘part-….’.
For example:
# create spark session
spark = SparkSession.builder.getOrCreate()
# generate example dataframe
df = spark.range(100).select(F.col("id"))
df = df.select("*", *(F.rand(1).alias("col_" + str(target)) for target in range(3)))
# repartition to demonstrate saving dataframe with multiple partitions
df = df.repartition(5)
# write outputs to csv
df.write.csv('test')
In this case, the output is saved to five separate files in a directory called test
# ls test
_SUCCESS
part-00001-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv
part-00003-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv
part-00000-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv
part-00002-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv
part-00004-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv
I found this confusing and unintuitive at first. Coming from using Python packages like Pandas, I was used to running pd.to_csv
and receiving my data in single output CSV file.
With PySpark (admittedly without much thought), I expected the same thing to happen when I ran df.write.csv
.
PySpark is designed to work with very large datasets with the processing distributed across many executors. Data is stored across different partitions so it is more efficient for PySpark to parallelise writing the data by just dumping what is on each executor to disk (therefore creating many files), rather than trying to combine the data into one file first.
Creating one file per partition is also more efficient when reading the data at a later date as it can also be parallelised.
Ok, this makes sense. But doesn’t help me when my output data is relatively small and I just want it in a single output file to share with stakeholders for easier analysis.
So how can we get around this default behaviour?
❌ Writing to a single file with repartition(1) or coalesce
Before going into the final solution, let’s discuss the ‘naive’ approach for writing a single file – using repartition(1)
or coalesce(1)
– and why it might not work so well.
# repartition dataframe to create single partition
df = df.repartition(1)
# write outputs to csv
df.write.csv('test')
This still creates a new directory, albeit with just one file inside.
# ls test
_SUCCESS
part-00000-51e0b5f6-53cf-49c4-b9e9-294d7dd7603f-c000.csv
Repartitioning the dataframe to move all data to a single partition before saving does create a single output file. But we still have three problems:
- It is still saved into a folder (not just an individual file)
- We still don’t have control of the name of the actual file; it is still named ‘part-…csv’
- Saving the data after repartitioning/coalescing onto a single partition can be inefficient for large dataframes.
To expand on the final point: after applying repartition(1) or coalesce(1), writing the data will not be parallelised. The work of saving the dataframe will be ‘hot-spotted’ onto a single executor which can greatly impact write performance for large datasets.
✅ Writing PySpark dataframe to a single file efficiently: Copy Merge Into
To get around these issues we can use the following approach:
- Save the dataframe as normal but to a temporary directory
- Use some Hadoop commands via the
py4j.java_gateway
API to efficiently merge the partitioned data into a single file - Delete the temporary directory
Copy Merge Into
The main copy_merge_into
function below takes files saved in a given HDFS directory and merges them into a single output file with an arbitrary name.
from typing import List
from py4j.java_gateway import JavaObject
from pyspark.sql import SparkSession
def configure_hadoop(spark: SparkSession):
hadoop = spark.sparkContext._jvm.org.apache.hadoop # type: ignore
conf = hadoop.conf.Configuration()
fs = hadoop.fs.FileSystem.get(conf)
return hadoop, conf, fs
def ensure_exists(spark: SparkSession, file: str):
hadoop, _, fs = configure_hadoop(spark)
if not fs.exists(hadoop.fs.Path(file)):
out_stream = fs.create(hadoop.fs.Path(file, False))
out_stream.close()
def delete_location(spark: SparkSession, location: str):
hadoop, _, fs = configure_hadoop(spark)
if fs.exists(hadoop.fs.Path(location)):
fs.delete(hadoop.fs.Path(location), True)
def get_files(spark: SparkSession, src_dir: str) -> List[JavaObject]:
"""Get list of files in HDFS directory"""
hadoop, _, fs = configure_hadoop(spark)
ensure_exists(spark, src_dir)
files = []
for f in fs.listStatus(hadoop.fs.Path(src_dir)):
if f.isFile():
files.append(f.getPath())
if not files:
raise ValueError("Source directory {} is empty".format(src_dir))
return files
def copy_merge_into(
spark: SparkSession, src_dir: str, dst_file: str, delete_source: bool = True
):
"""Merge files from HDFS source directory into single destination file
Args:
spark: SparkSession
src_dir: path to the directory where dataframe was saved in multiple parts
dst_file: path to single file to merge the src_dir contents into
delete_source: flag for deleting src_dir and contents after merging
"""
hadoop, conf, fs = configure_hadoop(spark)
# 1. Get list of files in the source directory
files = get_files(spark, src_dir)
# 2. Set up the 'output stream' for the final merged output file
# if destination file already exists, add contents of that file to the output stream
if fs.exists(hadoop.fs.Path(dst_file)):
tmp_dst_file = dst_file + ".tmp"
tmp_in_stream = fs.open(hadoop.fs.Path(dst_file))
tmp_out_stream = fs.create(hadoop.fs.Path(tmp_dst_file), True)
try:
hadoop.io.IOUtils.copyBytes(
tmp_in_stream, tmp_out_stream, conf, False
) # False means don't close out_stream
finally:
tmp_in_stream.close()
tmp_out_stream.close()
tmp_in_stream = fs.open(hadoop.fs.Path(tmp_dst_file))
out_stream = fs.create(hadoop.fs.Path(dst_file), True)
try:
hadoop.io.IOUtils.copyBytes(tmp_in_stream, out_stream, conf, False)
finally:
tmp_in_stream.close()
fs.delete(hadoop.fs.Path(tmp_dst_file), False)
# if file doesn't already exist, create a new empty file
else:
out_stream = fs.create(hadoop.fs.Path(dst_file), False)
# 3. Merge files from source directory into the merged file 'output stream'
try:
for file in files:
in_stream = fs.open(file)
try:
hadoop.io.IOUtils.copyBytes(
in_stream, out_stream, conf, False
) # False means don't close out_stream
finally:
in_stream.close()
finally:
out_stream.close()
# 4. Tidy up - delete the original source directory
if delete_source:
delete_location(spark, src_dir)
There are four main parts to the copy_merge_into
function:
- Get a list of the files in the given directory
- Create a new file that will be used for merging the multiple files into one
- Merges the files from the source directory into the new file
- Deletes the original directory
Putting it together
An example script for saving your dataframe to a single CSV file with headers could look something like this:
TMP_OUTPUT_DIR = "test"
OUTPUT_FILE = "test.csv"
# your dataframe (repartitioning for demo purposes only)
df = df.repartition(5)
# write headers first (required for csv only)
headers = spark.createDataFrame(
data=[[f.name for f in df.schema.fields]],
schema=T.StructType(
[T.StructField(f.name, T.StringType(), False) for f in df.schema.fields]
),
)
headers.write.csv(TMP_OUTPUT_DIR)
# write csv headers to output file first
copy_merge_into(
spark,
TMP_OUTPUT_DIR,
OUTPUT_FILE,
delete_source=True,
)
# Write main outputs
# dataframe written to TMP_OUTPUT_DIR folder in 5 separate csv files (one for each partition)
df.write.csv(TMP_OUTPUT_DIR)
# merge main csv files in folder into single file
copy_merge_into(
spark,
TMP_OUTPUT_DIR,
OUTPUT_FILE,
delete_source=True,
)
This script will save your PySpark dataframe to a single output file called test.csv
Note: This approach is compatible with HDFS and local file systems (e.g. for testing)
Happy coding!
Resources
- Code inspired by https://github.com/Tagar/abalon/blob/master/abalon/spark/sparkutils.py
- Code snippets available in the e4ds-snippets GitHub repo
Further Reading
- Unit testing PySpark code using Pytest
- How to save the output of PySpark DataFrame 'show' to a variable
- How to search and replace across multiple files using Vim
- Google Search Console API with Python
- What I learned optimising someone else’s code
- Deploying Dremio on Google Cloud
- Gitmoji: Add Emojis to Your Git Commit Messages!
- Five Tips to Elevate the Readability of your Python Code
- Do Programmers Need to be able to Type Fast?
- How to Manage Multiple Git Accounts on the Same Machine