Writing files with PySpark can be confusing at first. Generating a single output file from your dataframe (with a name of your choice) can be surprisingly challenging and is not the default behaviour.

This blog explains how to save the output of a PySpark DataFrame to a single, neatly organized file with a name of your choice and in an efficient manner.

💻 All code snippets in this post are available in the engineeringfordatascience GitHub repo

PySpark’s default behaviour when writing files

When you call PySpark’s ‘write’ method, your dataframe will not be written to a single file.

Instead, it is saved to a new directory, inside of which will be your data but split across multiple files – one for each partition. Additionally, these files in the directory are all given a random name starting with ‘part-….’.

For example:

# create spark session
spark = SparkSession.builder.getOrCreate()

# generate example dataframe
df = spark.range(100).select(F.col("id"))
df = df.select("*", *(F.rand(1).alias("col_" + str(target)) for target in range(3)))

# repartition to demonstrate saving dataframe with multiple partitions
df = df.repartition(5)

# write outputs to csv
df.write.csv('test')

In this case, the output is saved to five separate files in a directory called test

# ls test

_SUCCESS
part-00001-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv  
part-00003-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv
part-00000-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv  
part-00002-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv  
part-00004-6214c8be-5323-496b-b76f-9e1ddb15f9c0-c000.csv

I found this confusing and unintuitive at first. Coming from using Python packages like Pandas, I was used to running pd.to_csv and receiving my data in single output CSV file.

With PySpark (admittedly without much thought), I expected the same thing to happen when I ran df.write.csv.

PySpark is designed to work with very large datasets with the processing distributed across many executors. Data is stored across different partitions so it is more efficient for PySpark to parallelise writing the data by just dumping what is on each executor to disk (therefore creating many files), rather than trying to combine the data into one file first.

Creating one file per partition is also more efficient when reading the data at a later date as it can also be parallelised.

Ok, this makes sense. But doesn’t help me when my output data is relatively small and I just want it in a single output file to share with stakeholders for easier analysis.

So how can we get around this default behaviour?

❌ Writing to a single file with repartition(1) or coalesce

Before going into the final solution, let’s discuss the ‘naive’ approach for writing a single file – using repartition(1) or coalesce(1) – and why it might not work so well.

# repartition dataframe to create single partition
df = df.repartition(1)

# write outputs to csv
df.write.csv('test')

This still creates a new directory, albeit with just one file inside.

# ls test

_SUCCESS
part-00000-51e0b5f6-53cf-49c4-b9e9-294d7dd7603f-c000.csv

Repartitioning the dataframe to move all data to a single partition before saving does create a single output file. But we still have three problems:

  1. It is still saved into a folder (not just an individual file)
  2. We still don’t have control of the name of the actual file; it is still named ‘part-…csv’
  3. Saving the data after repartitioning/coalescing onto a single partition can be inefficient for large dataframes.

To expand on the final point: after applying repartition(1) or coalesce(1), writing the data will not be parallelised. The work of saving the dataframe will be ‘hot-spotted’ onto a single executor which can greatly impact write performance for large datasets.

✅ Writing PySpark dataframe to a single file efficiently: Copy Merge Into

To get around these issues we can use the following approach:

  1. Save the dataframe as normal but to a temporary directory
  2. Use some Hadoop commands via the py4j.java_gateway API to efficiently merge the partitioned data into a single file
  3. Delete the temporary directory

Copy Merge Into

The main copy_merge_into function below takes files saved in a given HDFS directory and merges them into a single output file with an arbitrary name.

from typing import List

from py4j.java_gateway import JavaObject
from pyspark.sql import SparkSession


def configure_hadoop(spark: SparkSession):
    hadoop = spark.sparkContext._jvm.org.apache.hadoop  # type: ignore
    conf = hadoop.conf.Configuration()
    fs = hadoop.fs.FileSystem.get(conf)
    return hadoop, conf, fs


def ensure_exists(spark: SparkSession, file: str):
    hadoop, _, fs = configure_hadoop(spark)
    if not fs.exists(hadoop.fs.Path(file)):
        out_stream = fs.create(hadoop.fs.Path(file, False))
        out_stream.close()


def delete_location(spark: SparkSession, location: str):
    hadoop, _, fs = configure_hadoop(spark)
    if fs.exists(hadoop.fs.Path(location)):
        fs.delete(hadoop.fs.Path(location), True)


def get_files(spark: SparkSession, src_dir: str) -> List[JavaObject]:
    """Get list of files in HDFS directory"""
    hadoop, _, fs = configure_hadoop(spark)
    ensure_exists(spark, src_dir)
    files = []
    for f in fs.listStatus(hadoop.fs.Path(src_dir)):
        if f.isFile():
            files.append(f.getPath())
    if not files:
        raise ValueError("Source directory {} is empty".format(src_dir))

    return files


def copy_merge_into(
    spark: SparkSession, src_dir: str, dst_file: str, delete_source: bool = True
):
    """Merge files from HDFS source directory into single destination file

    Args:
        spark: SparkSession
        src_dir: path to the directory where dataframe was saved in multiple parts
        dst_file: path to single file to merge the src_dir contents into
        delete_source: flag for deleting src_dir and contents after merging

    """
    hadoop, conf, fs = configure_hadoop(spark)

    # 1. Get list of files in the source directory
    files = get_files(spark, src_dir)

    # 2. Set up the 'output stream' for the final merged output file
    # if destination file already exists, add contents of that file to the output stream
    if fs.exists(hadoop.fs.Path(dst_file)):
        tmp_dst_file = dst_file + ".tmp"
        tmp_in_stream = fs.open(hadoop.fs.Path(dst_file))
        tmp_out_stream = fs.create(hadoop.fs.Path(tmp_dst_file), True)
        try:
            hadoop.io.IOUtils.copyBytes(
                tmp_in_stream, tmp_out_stream, conf, False
            )  # False means don't close out_stream
        finally:
            tmp_in_stream.close()
            tmp_out_stream.close()

        tmp_in_stream = fs.open(hadoop.fs.Path(tmp_dst_file))
        out_stream = fs.create(hadoop.fs.Path(dst_file), True)
        try:
            hadoop.io.IOUtils.copyBytes(tmp_in_stream, out_stream, conf, False)
        finally:
            tmp_in_stream.close()
            fs.delete(hadoop.fs.Path(tmp_dst_file), False)
    # if file doesn't already exist, create a new empty file
    else:
        out_stream = fs.create(hadoop.fs.Path(dst_file), False)

    # 3. Merge files from source directory into the merged file 'output stream'
    try:
        for file in files:
            in_stream = fs.open(file)
            try:
                hadoop.io.IOUtils.copyBytes(
                    in_stream, out_stream, conf, False
                )  # False means don't close out_stream
            finally:
                in_stream.close()
    finally:
        out_stream.close()

    # 4. Tidy up - delete the original source directory
    if delete_source:
        delete_location(spark, src_dir)

There are four main parts to the copy_merge_into function:

  1. Get a list of the files in the given directory
  2. Create a new file that will be used for merging the multiple files into one
  3. Merges the files from the source directory into the new file
  4. Deletes the original directory

Putting it together

An example script for saving your dataframe to a single CSV file with headers could look something like this:

TMP_OUTPUT_DIR = "test"
OUTPUT_FILE = "test.csv"

# your dataframe (repartitioning for demo purposes only)
df = df.repartition(5)

# write headers first (required for csv only)
headers = spark.createDataFrame(
    data=[[f.name for f in df.schema.fields]],
    schema=T.StructType(
        [T.StructField(f.name, T.StringType(), False) for f in df.schema.fields]
    ),
)
headers.write.csv(TMP_OUTPUT_DIR)

# write csv headers to output file first
copy_merge_into(
    spark,
    TMP_OUTPUT_DIR,
    OUTPUT_FILE,
    delete_source=True,
)

# Write main outputs
# dataframe written to TMP_OUTPUT_DIR folder in 5 separate csv files (one for each partition)
df.write.csv(TMP_OUTPUT_DIR)

# merge main csv files in folder into single file
copy_merge_into(
    spark,
    TMP_OUTPUT_DIR,
    OUTPUT_FILE,
    delete_source=True,
)

This script will save your PySpark dataframe to a single output file called test.csv

Note: This approach is compatible with HDFS and local file systems (e.g. for testing)

Happy coding!

Resources

Further Reading