Update Nov 17, 2023 - Added results using the latest DataFusion version.

Some time ago, we came across an intriguing Parquet sorting test shared by Mimoune Djouallah on Twitter @mim_djo. The test involves reading a Parquet file, sorting the table, and writing the sorted data back to a Parquet file on disk. You can find the original post here.

Mimoune

The dataset under consideration is derived from the TPC-H benchmark with a scale factor SF of 10, specifically the lineitem table, comprising 59,986,052 rows. The data was generated using DuckDB's TPC-H extension and saved as Parquet files with "snappy" compression and a row group size of 122,880.

Given the manageable size of the table, the sorting operation fits in memory, though this might not be the case for larger scale factors. To shed light on different approaches and packages, we compared implementations using various tools:

Here are the language and package versions used:

Python 3.11.5 | packaged by conda-forge | (main, Aug 27 2023, 03:34:09) [GCC 12.3.0] on linux
duckdb          : 0.9.2
tableauhyperapi : 0.0.18161
chdb            : 0.15.0
polars          : 0.19.13
pyarrow         : 14.0.1
glaredb         : 0.6.1
datafusion      : 33.0.0

The code was executed on a Linux laptop with the following specifications:

OS : Linux mint 21.1, based on Ubuntu 22.04  
CPU : 12th Gen Intel© Core™ i9-12900H (10 cores)    
RAM : 32 GB  
Data disk : Samsung SSD 980 PRO 1TB  

Below are snippets of the code using different libraries for the sorting operation.

Code snippets

Imports:

import os 
import chdb
from datafusion import SessionContext
import duckdb
import glaredb
import polars as pl
import pyarrow as pa
from tableauhyperapi import Connection, CreateMode, HyperProcess, Telemetry

Input and output parquet files:

data_dir_path = "/home/francois/Data/dbbenchdata/tpch_10/"
input_file_path =  os.path.join(data_dir_path, "lineitem.parquet")
output_file_path = os.path.join(data_dir_path, "lineitem_sorted.parquet")

DuckDB

duckdb_file_path = os.path.join(data_dir_path, "data.duckdb")
with duckdb.connect(database=duckdb_file_path, read_only=False) as conn:
    conn.execute(
        f"""COPY (SELECT * FROM read_parquet('{input_file_path}')
        ORDER BY l_shipdate ASC)
        TO '{output_file_path}' (FORMAT PARQUET)"""
    )

Remark: here we gave a database file name to the connect() method, to read or write persistent data, however this is not used in the present case. Similarly, in the next sub-section, we also gave a file path to the Hyper engine.

Hyper

hyper_file_path = os.path.join(data_dir_path, "data.hyper")
with HyperProcess(
    telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU,
) as hyper:
    with Connection(
        endpoint=hyper.endpoint,
        database=hyper_file_path,
        create_mode=CreateMode.CREATE_AND_REPLACE,
    ) as conn:
        sql = f"""COPY
        (SELECT * FROM external('{input_file_path}')
        ORDER BY l_shipdate ASC)
        TO '{output_file_path}' WITH ( FORMAT => 'parquet')
        """
        _ = conn.execute_command(sql)

Note that we also tried a version with a temporary external table, but without significatively different results:

with HyperProcess(
    telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU,
) as hyper:
    with Connection(
        endpoint=hyper.endpoint,
        database=hyper_file_path,
        create_mode=CreateMode.CREATE_AND_REPLACE,
    ) as conn:
        sql = f"""CREATE TEMPORARY EXTERNAL TABLE lineitem
                FOR '{input_file_path}'
                WITH (FORMAT => 'parquet');"""
        _ = conn.execute_command(sql)
        sql = f"""COPY (SELECT * FROM lineitem ORDER BY l_shipdate ASC)
        TO '{output_file_path}' WITH ( FORMAT => 'parquet')"""
        _ = conn.execute_command(sql)

CHDB (ClickHouse)

sql = f"""SELECT * FROM file ('{input_file_path}', Parquet)
ORDER BY l_shipdate
INTO OUTFILE '{output_file_path}'  FORMAT Parquet  """
_ = chdb.query(sql, "JSON")

Polars

pl.read_parquet(input_file_path).sort("l_shipdate").write_parquet(output_file_path)

we experimented with the streaming API, which seemed promising:

pl.scan_parquet(input_file_path).sort("l_shipdate").sink_parquet(
    path=output_file_path
)

However, it encountered a crash:

thread '<unnamed>' panicked at crates/polars-pipe/src/executors/sinks/io.rs:149:49:
called `Result::unwrap()` on an `Err` value: Io(Os { code: 28, kind: StorageFull, message: "No space left on device" })
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

Despite ample available space on the device, we think that the issue stemmed from the API attempting to write numerous small files to disk.

PyArrow

table = pq.read_table(input_file_path)
sorted_table = table.sort_by([("l_shipdate", "ascending")])
pq.write_table(sorted_table, output_file_path)

GlareDB

Despite efforts, writing a sorted table into a Parquet file using GlareDB proved challenging, potentially due to a parallel Parquet writer implementation. As a workaround, we opted to delegate this task to pyarrow.parquet.

con = glaredb.connect()    
sql = f"""SELECT * FROM parquet_scan('{input_file_path}') ORDER BY l_shipdate ASC"""
table = con.sql(sql).to_arrow()
pq.write_table(table, where=output_file_path)

DataFusion

DataFusion is writing several limited size Parquet files located in a common folder. We did not find a way to change this file size threshold. In order to get a single sorted Parquet file, we used an arrow table and pyarrow.parquet to write a single Parquet file.

ctx = SessionContext()
ctx.register_parquet("lineitem", input_file_path)
sorted_table = ctx.sql("SELECT * FROM lineitem ORDER BY l_shipdate").to_arrow_table()
pq.write_table(sorted_table, output_file_path)

Others

Unfortunately, attempts to utilize Dask were unsuccessful, resulting in crashes, and we did not investigate much. It's worth noting that it performed well for a smaller tables, such as with SF1 or SF3.

Results

elapsed_time

In the sorting performance comparison, DuckDB demonstrated the quickest elapsed time.