In this notebook, we are going to query some Parquet files with the following SQL engines:

  • DuckDB : an in-process SQL OLAP database management system. We are going to use its Python Client API [MIT license].
  • Tableau Hyper : an in-memory data engine. We are going to interact with this engine using the tableauhyperapi Python package [Proprietary License].

Both of these tools are optimized for Online analytical processing [OLAP]. We do not want to modify the data but launch queries that require processing a large amount of data. DuckDB and Tableau Hyper make use of some vectorized engine and some amount of parallel processing, well suited for the columnar storage format of Parquet files. This is very well described in this post from the DuckDB team. Here is a quote from this blog post:

DuckDB will read the Parquet files in a streaming fashion, which means you can perform queries on large Parquet files that do not fit in your main memory.

Tableau Hyper engine has the ability to read Parquet files using the external keyword.

External data can be read directly in a SQL query using the set returning function external. In this case, no Hyper table is involved, so such a query can even be used if no database is attached to the current session.

The Parquet files correspond to a very specific use case, since they all describe some road networks from the US or Europe. The US road networks were imported in a previous post: Download some benchmark road networks for Shortest Paths algorithms. The Europe networks were downloaded from this web page and converted to Parquet files. We are only going to use the edge table, not the node coordinates one. The SQL queries in this notebook are also very specific, in a sense that they are related to the graph theory domain. Here are the things that we are going to compute:

  1. occurence of parallel edges
  2. vertex and edge counts
  3. count of connected vertices
  4. count of vertices with one incoming and one outgoing egde
  5. degree distribution

For each query and SQL engine, we are going to measure the elapsed time. In this post, we did not measure the memory consumption.

Notes:

  • The Parquet files are not compressed.
  • both engines usually make use of their own optimized file format, e.g. .hyper files for Tableau hyper. However, they both support direct querying of CSV or Parquet files.
  • We are going to use DuckDB and Tableau Hyper with the default configuration.
  • Most of the SQL queries could probably be optimized, however we believe that they are efficient enough for the comparison purpose of this short post.
  • In all the elapsed time bar charts, lower is better.

Imports

Note that both tools are very easy to install:

$ pip install duckdb  
$ pip install tableauhyperapi  

Here are the imports:

import os
from time import perf_counter
import duckdb
import pandas as pd
from pandas.testing import assert_frame_equal
from tableauhyperapi import Connection, HyperProcess, Telemetry
pd.set_option("display.precision", 2)  # Pandas float number display
TELEMETRY = Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU  # not sending telemetry data to Tableau
FS = (12, 6)  # figure size
ALPHA = 0.8  # figure transparency

Package versions:

Python         : 3.10.6
duckdb         : 0.5.1
tableauhyperapi: 0.0.15530
pandas         : 1.5.0

System information:

OS             : Linux
Architecture   : 64bit
CPU            : Intel(R) Core(TM) i7-7700HQ CPU @ 2.80GHz
CPU cores      : 8
RAM            : 32GB

Apache Parquet files

names = ["NY", "BAY", "COL", "FLA", "NW", "NE", "CAL", "LKS", "E", "W", "CTR", "USA"]
stats = {}
parquet_graph_file_paths = {}
parquet_file_sizes = []
for name in names:
    parquet_graph_file_path = f"/home/francois/Data/Disk_1/DIMACS_road_networks/{name}/USA-road-t.{name}.gr.parquet"
    stats[name] = {}
    stats[name]["parquet_file_size_MB"] = (
        os.path.getsize(parquet_graph_file_path) * 1.0e-6
    )
    parquet_graph_file_paths[name] = parquet_graph_file_path
names_osmr = ["osm-bawu", "osm-ger", "osm-eur"]
names += names_osmr
for name in names_osmr:
    parquet_graph_file_path = (
        f"/home/francois/Data/Disk_1/OSMR/{name}/{name}.gr.parquet"
    )
    stats[name] = {}
    stats[name]["parquet_file_size_MB"] = (
        os.path.getsize(parquet_graph_file_path) * 1.0e-6
    )
    parquet_graph_file_paths[name] = parquet_graph_file_path
ordered_names = ["NY", "BAY", "COL", "FLA", "NW", "NE", "CAL", "osm-bawu", "LKS", "E", "W", "CTR", "osm-ger", "USA", "osm-eur"]
stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df.sort_values(by="parquet_file_size_MB", ascending=True, inplace=True)
stats_df
parquet_file_size_MB
NY9.11
BAY10.14
COL13.32
FLA31.86
NW34.43
NE46.36
CAL56.40
LKS82.57
E105.59
osm-bawu106.99
W184.64
CTR428.21
USA702.06
osm-ger730.79
osm-eur6112.20

First query : parallel edges

In this first query, we want to check if there are parallel edges in the graph. This should not happen with the US networks, because we removed the parallel edges when we created the parquet files in a previous post. When we imported the Europe networks, we created the parquet files by chunks, so we can guarantee that there is no parallel edge within each chunk, but nor overall. Here is the query:

query_1 = """
SELECT CASE WHEN COUNT(*) > 0 THEN 1 ELSE 0 END 
FROM (
    SELECT source, target, COUNT(*)
    FROM graph_edges
    GROUP BY source, target
    HAVING COUNT(*) > 1
)"""

We expect this query to return 0 for each graph.

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]
    connection = duckdb.connect()
    # query
    start = perf_counter()
    query = query_1.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    duplicates = connection.query(query).fetchone()[0]
    elapsed_time_s = perf_counter() - start
    connection.close()
    res_duckdb[name] = {}
    res_duckdb[name]["duplicates"] = duplicates
    assert duplicates == 0
    stats[name]["query_1_DuckDB"] = elapsed_time_s
connection.close()
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        parquet_graph_file_path = parquet_graph_file_paths[name]
        with Connection(endpoint=hyper.endpoint) as connection:
            # query
            start = perf_counter()
            query = query_1.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            duplicates = connection.execute_scalar_query(query)
            elapsed_time_s = perf_counter() - start
        res_hyper[name] = {}
        res_hyper[name]["duplicates"] = duplicates
        assert duplicates == 0
        stats[name]["query_1_Hyper"] = elapsed_time_s
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_1")]
query_1_df = stats_df[cols]
ax = query_1_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_1", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_1

Results

There is no parallel edge in any of the networks.

Second query : vertex and edge counts

query_2 = "SELECT COUNT(*), MAX(source), MAX(target) FROM graph_edges"

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]
    connection = duckdb.connect()
    # query
    start = perf_counter()
    query = query_2.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    res = connection.query(query).fetchall()[0]
    elapsed_time_s = perf_counter() - start
    connection.close()
    edge_count = res[0]
    vertex_count = max(res[1:3]) + 1
    stats[name]["vertex_count"] = vertex_count
    stats[name]["edge_count"] = edge_count
    stats[name]["query_2_DuckDB"] = elapsed_time_s
    res_duckdb[name] = {}
    res_duckdb[name]["vertex_count"] = vertex_count
    res_duckdb[name]["edge_count"] = edge_count
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        parquet_graph_file_path = parquet_graph_file_paths[name]
        with Connection(endpoint=hyper.endpoint) as connection:
            # query
            start = perf_counter()
            query = query_2.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            res = connection.execute_list_query(query)[0]
            elapsed_time_s = perf_counter() - start
        edge_count = res[0]
        vertex_count = max(res[1:3]) + 1
        stats[name]["query_2_Hyper"] = elapsed_time_s
        res_hyper[name] = {}
        res_hyper[name]["vertex_count"] = vertex_count
        res_hyper[name]["edge_count"] = edge_count
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_2")]
query_2_df = stats_df[cols]
ax = query_2_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_2", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_2

Results

stats_df[["vertex_count", "edge_count"]]
vertex_countedge_count
NY264346730100
BAY321270794830
COL4356661042400
FLA10703762687902
NW12079452820774
NE15244533868020
CAL18908154630444
osm-bawu30642636183798
LKS27581196794808
E35986238708058
W626210415119284
CTR1408181633866826
osm-ger2069032041791542
USA2394734757708624
osm-eur173789185347997111

Third query : count of connected vertices

Some vertices are isolated in the graph : this means that their degree is 0. We want to count the number of connected vertices in the graph [not isolated].

query_3 = f"""
WITH edges AS (
    SELECT source, target 
    FROM graph_edges)
SELECT COUNT(*) 
FROM (
    SELECT source AS node 
    FROM edges     
        UNION     
    SELECT target AS node 
    FROM edges)"""

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]
    connection = duckdb.connect()
    # query
    start = perf_counter()
    query = query_3.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    connected_vertices = connection.query(query).fetchone()[0]
    elapsed_time_s = perf_counter() - start
    connection.close()
    stats[name]["connected_vertices"] = connected_vertices
    stats[name]["mean_degree"] = stats[name]["edge_count"] / connected_vertices
    stats[name]["query_3_DuckDB"] = elapsed_time_s
    res_duckdb[name] = {}
    res_duckdb[name]["connected_vertices"] = connected_vertices
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        parquet_graph_file_path = parquet_graph_file_paths[name]
        with Connection(endpoint=hyper.endpoint) as connection:
            # query
            start = perf_counter()
            query = query_3.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            connected_vertices = connection.execute_scalar_query(query)
            elapsed_time_s = perf_counter() - start
        stats[name]["query_3_Hyper"] = elapsed_time_s
        res_hyper[name] = {}
        res_hyper[name]["connected_vertices"] = connected_vertices
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_3")]
query_3_df = stats_df[cols]
ax = query_3_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_3", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_3

We observe that the elapsed time measures for the largest network differ a lot between DuckDB and Tableau Hyper:

EngineElapsed time (s)
DuckDB38.71
Tableau Hyper357.47

Results

There is no isolated vertex in any of the network.

stats_df[["vertex_count", "connected_vertices", "mean_degree"]]
vertex_countconnected_verticesmean_degree
NY2643462643462.76
BAY3212703212702.47
COL4356664356662.39
FLA107037610703762.51
NW120794512079452.34
NE152445315244532.54
CAL189081518908152.45
osm-bawu306426330642632.02
LKS275811927581192.46
E359862335986232.42
W626210462621042.41
CTR14081816140818162.41
osm-ger20690320206903202.02
USA23947347239473472.41
osm-eur1737891851737891852.00

Forth query : count of vertices with one incoming and one outgoing egde

Count of degree 2 nodes with in-degree=out-degree=1. In the following, we refer to these vertices as in-out vertices.

query_4 = """
    WITH TPARQUET AS (
        SELECT source, target 
        FROM graph_edges),
    TSOURCE AS (
        SELECT source AS node, COUNT(*) 
        FROM TPARQUET 
        GROUP BY source HAVING COUNT(*)=1),
    TTARGET AS (
        SELECT target AS node, COUNT(*) 
        FROM TPARQUET GROUP BY target HAVING COUNT(*)=1),
    TJOIN AS (
        SELECT s.node 
        FROM TSOURCE s 
        INNER JOIN TTARGET t ON s.node = t.node)
    SELECT COUNT(*) from TJOIN"""

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]
    connection = duckdb.connect()
    # query
    start = perf_counter()
    query = query_4.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    inout_vertices = connection.query(query).fetchone()[0]
    elapsed_time_s = perf_counter() - start
    connection.close()
    stats[name]["inout_vertices"] = inout_vertices
    stats[name]["query_4_DuckDB"] = elapsed_time_s
    res_duckdb[name] = {}
    res_duckdb[name]["inout_vertices"] = inout_vertices
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        parquet_graph_file_path = parquet_graph_file_paths[name]
        with Connection(endpoint=hyper.endpoint) as connection:
            # query
            start = perf_counter()
            query = query_4.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            inout_vertices = connection.execute_scalar_query(query)
            elapsed_time_s = perf_counter() - start
        stats[name]["query_4_Hyper"] = elapsed_time_s
        res_hyper[name] = {}
        res_hyper[name]["inout_vertices"] = inout_vertices
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_4")]
query_4_df = stats_df[cols]
ax = query_4_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_4", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_4

Results

stats_df["ratio"] = stats_df.inout_vertices / stats_df.vertex_count
stats_df[["inout_vertices", "vertex_count", "ratio"]]
inout_verticesvertex_countratio
NY411692643460.16
BAY731093212700.23
COL825374356660.19
FLA21084910703760.20
NW28263812079450.23
NE28869515244530.19
CAL37394718908150.20
osm-bawu39824230642630.13
LKS47826327581190.17
E76483835986230.21
W120955062621040.19
CTR2787565140818160.20
osm-ger2874055206903200.14
USA4762005239473470.20
osm-eur203099421737891850.12

Fifth query : degree distribution

query_5 = """
    CREATE TEMP TABLE t_edges AS 
        SELECT source, target 
        FROM graph_edges;
    CREATE TEMP TABLE t_nodes AS
        SELECT source AS node 
        FROM t_edges     
            UNION ALL
        SELECT target AS node 
        FROM t_edges;
    CREATE TEMP TABLE t_deg AS
        SELECT COUNT(*) AS deg
        FROM t_nodes
        GROUP BY node;
    SELECT deg, COUNT(*) AS n_occ 
    FROM t_deg
    GROUP BY deg
    ORDER BY deg ASC;"""

DuckDB

res_duckdb = {}
for name in names:
    parquet_graph_file_path = parquet_graph_file_paths[name]
    connection = duckdb.connect()
    
    # query
    start = perf_counter()
    query = query_5.replace("graph_edges", f"read_parquet('{parquet_graph_file_path}')")
    queries = query.removesuffix(";").split(";")
    for sq in queries[:-1]:
        connection.execute(sq)
    sq = queries[-1]
    res = connection.query(sq).fetchall()
    elapsed_time_s = perf_counter() - start
    stats[name]["query_5_DuckDB"] = elapsed_time_s
    connection.close()
    res_duckdb[name] = {}
    for item in res:
        degree = item[0]
        vertex_count = item[1]
        res_duckdb[name]["degree_" + str(degree).zfill(3)] = vertex_count
res_duckdb_df = pd.DataFrame.from_dict(res_duckdb, orient="index")
res_duckdb_df = res_duckdb_df.sort_index(axis=1)
res_duckdb_df = res_duckdb_df.fillna(0).astype(int)

Tableau Hyper

res_hyper = {}
with HyperProcess(telemetry=TELEMETRY) as hyper:
    for name in names:
        with Connection(endpoint=hyper.endpoint) as connection:
            parquet_graph_file_path = parquet_graph_file_paths[name]
            # query
            start = perf_counter()
            query = query_5.replace(
                "graph_edges",
                f"external('{parquet_graph_file_path}', FORMAT => 'parquet')",
            )
            queries = query.removesuffix(";").split(";")
            for sq in queries[:-1]:
                connection.execute_command(sq)
            sq = queries[-1]
            res = connection.execute_list_query(sq)
            elapsed_time_s = perf_counter() - start
        for item in res:
            degree = item[0]
            vertex_count = item[1]
            stats[name]["degree_" + str(degree).zfill(3)] = vertex_count
        stats[name]["query_5_Hyper"] = elapsed_time_s
        res_hyper[name] = {}
        for item in res:
            degree = item[0]
            vertex_count = item[1]
            res_hyper[name]["degree_" + str(degree).zfill(3)] = vertex_count
res_hyper_df = pd.DataFrame.from_dict(res_hyper, orient="index")
res_hyper_df = res_hyper_df.sort_index(axis=1)
res_hyper_df = res_hyper_df.fillna(0).astype(int)

Validation

assert_frame_equal(res_duckdb_df, res_hyper_df)

Elapsed time

stats_df = pd.DataFrame.from_dict(stats, orient="index")
stats_df = stats_df.loc[[c for c in ordered_names if c in stats_df.index.values]]
cols = [c for c in stats_df.columns if c.startswith("query_5")]
query_5_df = stats_df[cols]
ax = query_5_df.plot.bar(figsize=FS, grid=True, logy=True, rot=60, alpha=ALPHA)
_ = ax.legend(["DuckDB", "Hyper"])
_ = ax.set(title="Query_5", xlabel="Network", ylabel="Elapsed time (s) - Log scale")

query_5

Results

degree_cols = sorted([c for c in stats_df.columns if c.startswith("degree_")])
stats_df[degree_cols] = stats_df[degree_cols].fillna(0).astype(int)
degrees = 100 *  stats_df[degree_cols].div(stats_df["vertex_count"], axis=0)
cols = degrees.columns
degrees.columns = [int(c.split('_')[-1]) for c in cols]
tot = degrees.sum(axis=0)
degrees = degrees[list(tot[tot > 0.01].index.values)]
degrees['total'] = degrees.sum(axis=1)
styler = degrees.style.background_gradient(axis=1, cmap='YlOrRd')
styler = styler.format(precision=2)
styler 
 23456781012total
NY15.570.0015.270.0047.110.0021.560.430.06100.00
BAY22.760.0020.300.0043.940.0012.810.170.02100.00
COL18.950.0033.710.0036.590.0010.660.090.01100.00
FLA19.700.0022.730.0044.490.0012.940.130.01100.00
NW23.400.0028.990.0038.470.009.010.130.01100.00
NE18.940.0021.900.0045.950.0012.960.230.03100.00
CAL19.780.0027.530.0040.890.0011.660.140.01100.00
osm-bawu13.000.6072.270.5812.260.071.210.010.00100.00
LKS17.340.0032.010.0037.750.0012.770.120.01100.00
E21.250.0026.190.0042.070.0010.300.160.02100.00
W19.320.0030.720.0039.330.0010.490.130.01100.00
CTR19.800.0031.110.0038.010.0010.970.100.01100.00
osm-ger13.890.6070.330.6613.170.081.260.010.00100.00
USA19.890.0030.270.0038.970.0010.750.120.01100.00
osm-eur11.690.8476.030.509.800.051.090.010.00100.00

Conclusion

Apache Parquet is a column-oriented data file format designed for efficient data storage and retrieval. It is widespread in the data analysis ecosystem. Querying them directly with an efficient SQL engine is really convenient. Both engines, DuckDB and Tableau Hyper are amazing tools, allowing to efficiently query Parquet files, among other capabilities. We only scratched the surface of this feature in this post, with a very specific use case. We observed similar timings for most of the queries. We did not measure memory usage. However, we observed that it is important to write SQL queries that are a little bit optimized regarding memory consumption, when dealing with large datasets and "large" queries. Also, it is advised to specify a temp directory to the engine, so that it can write some temporary data there [temp_directory setting with DuckDB].