Holy 🦆uck! Fast Analysis with DuckDB + Pyarrow
Trying out some new speedy tools for data analysis
Holy 🦆uck! Fast Analysis with DuckDB + Pyarrow
Turning to DuckDB when you need to crunch more numbers faster than pandas in your Streamlit app 🎈
Inspired by "DuckDB quacks Arrow" blogpost cross-posted on duckdb and arrow
Background
streamlit
and Streamlit Cloud are fantastic for sharing your data exploration apps.
A very common pattern uses csv files with pandas
to accomplish the necessary steps of:
- Load the data into the program
- Filter data by certain columns or attributes
- Compute analyses on the data (averages, counts, etc.)
NYC Uber Data
- Streamlit DuckDB Uber NYC repo
- Includes 10 Year, 1.5 Billion row Taxi data example as well
- Streamlit Original Uber NYC repo
Let's take this NYC Uber dataset example from Streamlit. We'll pay attention to:
- How much RAM / memory is used
- How long it takes to perform each step
import pandas as pd
import numpy as np
# import streamlit as st
# singleton ignored because we're not in streamlit anymore
# @st.experimental_singleton
def load_data():
data = pd.read_csv(
"uber-raw-data-sep14.csv.gz",
nrows=100000, # approx. 10% of data
names=[
"date/time",
"lat",
"lon",
], # specify names directly since they don't change
skiprows=1, # don't read header since names specified directly
usecols=[0, 1, 2], # doesn't load last column, constant value "B02512"
parse_dates=[
"date/time"
], # set as datetime instead of converting after the fact
)
return data
%%time
data = load_data()
data.info()
Feel free to reference the read_csv
documentation, the focus of this post is on the nrows=100000
argument though.
This nrows
is used to limit the number of rows that get loaded into our application.
Taking in 100,000
rows landed us around 2.3 MB
of memory allocation for the data.
It loaded on my computer in ~3
seconds.
Let's see how that would go without our nrows
limitation
def load_full_data():
data = pd.read_csv(
"uber-raw-data-sep14.csv.gz",
# nrows=100000, # approx. 10% of data
names=[
"date/time",
"lat",
"lon",
], # specify names directly since they don't change
skiprows=1, # don't read header since names specified directly
usecols=[0, 1, 2], # doesn't load last column, constant value "B02512"
parse_dates=[
"date/time"
], # set as datetime instead of converting after the fact
)
return data
%%time
full_data = load_full_data()
full_data.info()
Ok, so with ~10
times as much data (1,028,136
vs 100,000
) we use:
-
~10
times as much memory (23.5 MB
vs2.3 MB
) -
~10
times as much time (30 s
vs2.94 s
)
The first time this app loads in streamlit
will be a bit slow either way, but the singleton
decorator is designed to prevent having to re-compute objects like this.
(Also note that this is a single month of data... a year might include ~12,337,632
entries based on this september 2014 data)
Enter the Duck
Using pyarrow
and duckdb
let's see if we get any improvement
import duckdb
import pyarrow as pa
from pyarrow import csv
import pyarrow.dataset as ds
def load_data_duckdb():
data = csv.read_csv('uber-raw-data-sep14.csv.gz', convert_options=csv.ConvertOptions(
include_columns=["Date/Time","Lat","Lon"],
timestamp_parsers=['%m/%d/%Y %H:%M:%S']
)).rename_columns(['date/time', 'lat', 'lon'])
# `dataset` is for partitioning larger datasets. Can't include timestamp parsing directly though
# data = ds.dataset("uber-raw-data-sep14.csv.gz", schema=pa.schema([
# ("Date/Time", pa.timestamp('s')),
# ('Lat', pa.float32()),
# ('Lon', pa.float32())
# ]), format='csv')
# DuckDB can query Arrow tables, so we'll just return the table and a connection for flexible querying
return data, duckdb.connect(":memory:")
arrow_data, con = load_data_duckdb()
arrow_data[:5]
%%timeit
load_data_duckdb()
Holy Smokes! Well that was fast and fun!
pyarrow
read the whole dataset in 153 ms
.
That's 0.153 s
compared to 30 s
with pandas
!
So how much memory are pyarrow
and duckdb
using?
def format_bytes(size):
"""from https://stackoverflow.com/a/49361727/15685218"""
# 2**10 = 1024
power = 2**10
n = 0
power_labels = {0 : '', 1: 'kilo', 2: 'mega', 3: 'giga', 4: 'tera'}
while size > power:
size /= power
n += 1
return size, power_labels[n]+'bytes'
format_bytes(arrow_data.nbytes)
Ok, the pyarrow
table has roughly the same size as the full pandas
Dataframe
con.execute('PRAGMA database_size;')
"""
database_size VARCHAR, -- total block count times the block size
block_size BIGINT, -- database block size
total_blocks BIGINT, -- total blocks in the database
used_blocks BIGINT, -- used blocks in the database
free_blocks BIGINT, -- free blocks in the database
wal_size VARCHAR, -- write ahead log size
memory_usage VARCHAR, -- memory used by the database buffer manager
memory_limit VARCHAR -- maximum memory allowed for the database
"""
database_size, block_size, total_blocks, used_blocks, free_blocks, wal_size, memory_usage, memory_limit = con.fetchall()[0]
memory_usage
We haven't told duckdb
to load anything into its own tables, so it still has no memory usage.
Nevertheless, duckdb
can query the arrow_data
since it's a pyarrow
table.
(duckdb
can also load directly from csv).
So where does that leave us on loading the full 1,000,000
row dataset?
-
pandas
:~30 s
of time and23.5 MB
-
pyarrow
:~.1 s
of time (153 ms
) and23.9 MB
In fairness, I tried pandas
with the pyarrow
engine.
At the time of writing I can't find a fast datetime parse and usecols
throws an error in pyarrow
(see end of post).
Reading the full CSV without datetime parsing is in line in terms of speed though.
(also see why the best CSV is not a CSV at all for more on this path)
%%time
arrow_df = pd.read_csv(
"uber-raw-data-sep14.csv.gz",
engine='pyarrow',
names=[
"date/time",
"lat",
"lon",
"CONST"
], # specify names directly since they don't change
skiprows=1, # don't read header since names specified directly
# usecols=[1, 2], # doesn't load last column, constant value "B02512"
parse_dates=[
"date/time"
], # set as datetime instead of converting after the fact
# infer_datetime_format=True # Unsupported for pyarrow
date_parser=lambda x: pd.to_datetime(x)
)
arrow_df.info()
%%timeit
arrow_df_no_datetime = pd.read_csv(
"uber-raw-data-sep14.csv.gz",
engine='pyarrow',
names=[
"date/time",
"lat",
"lon",
"CONST"
], # specify names directly since they don't change
skiprows=1, # don't read header since names specified directly
# usecols=[1, 2], # doesn't load last column, constant value "B02512"
)
# @st.experimental_memo
def filterdata(df, hour_selected):
return df[df["date/time"].dt.hour == hour_selected]
# CALCULATE MIDPOINT FOR GIVEN SET OF DATA
# @st.experimental_memo
def mpoint(lat, lon):
return (np.average(lat), np.average(lon))
# FILTER DATA BY HOUR
# @st.experimental_memo
def histdata(df, hr):
filtered = data[
(df["date/time"].dt.hour >= hr) & (df["date/time"].dt.hour < (hr + 1))
]
hist = np.histogram(filtered["date/time"].dt.minute, bins=60, range=(0, 60))[0]
return pd.DataFrame({"minute": range(60), "pickups": hist})
%%timeit
# For fairness, we'll use the full dataframe
filterdata(full_data, 14)
%%timeit
mpoint(full_data["lat"], full_data["lon"])
%%timeit
histdata(full_data, 14)
How about Duckdb (with conversion back to pandas
for fairness)
def duck_filterdata(con, hour_selected):
return con.query(
f'SELECT "date/time", lat, lon FROM arrow_data WHERE hour("date/time") = {hour_selected}'
).to_df()
def duck_mpoint(con):
return con.query("SELECT AVG(lat), AVG(lon) FROM arrow_data").fetchone()
def duck_histdata(con, hr):
hist_query = f'SELECT histogram(minute("date/time")) FROM arrow_data WHERE hour("date/time") >= {hr} and hour("date/time") < {hr + 1}'
results, *_ = con.query(hist_query).fetchone()
return pd.DataFrame(results)
%%timeit
duck_filterdata(con, 14)
%%timeit
duck_mpoint(con)
%%timeit
duck_histdata(con, 14)
We got a modest improvement in filterdata
and more than 10x speedup in histdata
, but actually lost out to numpy
for finding the average of 2 arrays in mpoint
!
-
filterdata
:-
pandas
: 18 ms ± 65.4 µs -
duckdb
: 6.03 ms ± 10.9 µs
-
-
mpoint
:-
numpy
: 404 µs ± 1.65 µs -
duckdb
: 1.62 ms ± 16.6 µs
-
-
histdata
:-
pandas
+numpy
: 39.9 ms ± 111 µs -
duckdb
: 2.86 ms ± 19.5 µs
-
18 / 6.03
404 / 1620
39.9 / 2.86
Larger Than Memory Data
Where this DuckDB + Arrow combo really shines is analyzing data that can't be handled by Pandas on your machine.
In many cases if the data doesn't fit in your computer's memory (RAM) then using Pandas will consume disk (Swap) to try and fit it, which will slow things down.
With the 10 Year dataset below the DuckDB authors found Pandas used 248 GBs
(!!!) of memory to read out ~300,000
rows from the ~1,500,000,000
.
In this case it just crashes most laptops.
So there evolved libraries such as Dask for handling these out-of-core situations through multiprocessing and distributed computing. Pandas has a whole list of related ecosystem projects.
To cut through data on a single laptop, DuckDB + Arrow + the Parquet format provide some impressive optimizations to where we don't need those 248 GBs
on any number of machines.
def load_from_10_year():
nyc = ds.dataset("nyc-taxi/", partitioning=["year", "month"])
# Get database connection
con = duckdb.connect()
# Run query that selects part of the data
query = con.execute(
f"SELECT total_amount, passenger_count,year FROM nyc where total_amount > 100 and year > 2014"
)
# Create Record Batch Reader from Query Result.
# "fetch_record_batch()" also accepts an extra parameter related to the desired produced chunk size.
record_batch_reader = query.fetch_record_batch()
# Retrieve all batch chunks
all_chunks = []
while True:
try:
# Process a single chunk here
# pyarrow.lib.RecordBatch
chunk = record_batch_reader.read_next_batch()
all_chunks.append(chunk)
except StopIteration:
break
data = pa.Table.from_batches(all_chunks)
return data
load_from_10_year()
%%timeit
load_from_10_year()
Less than 3 seconds to sift through 1.5 Billion rows of data...
Let's look at how long it takes to iterate over the whole set in chunks (the inefficient solution to most out-of-memory issues)
%%time
nyc = ds.dataset("nyc-taxi/", partitioning=["year", "month"])
# Get database connection
con = duckdb.connect()
query = con.execute(
"SELECT total_amount FROM nyc"
)
record_batch_reader = query.fetch_record_batch()
total_rows = 0
while True:
try:
chunk = record_batch_reader.read_next_batch()
total_rows += len(chunk)
except StopIteration:
break
total_rows
Iterating is definitely not going to work for user interactive apps. 2 - 3 seconds is bearable for most users (with a loading indicator), but 4 minutes is far too long to be engaging.
Pandas currently would need to perform this whole iteration or load the whole dataset to process the query we asked before.
Download instructions (from Ursa Labs S3 bucket) and Streamlit demo using the 10 year set is available in the same repo
Conclusions
It's no secret that Python is not a fast language, but there are tricks to speed it up.
Common advice is to utilize C optimizations via numpy
and pandas
.
Another new contender is utilizing the C++ driven duckdb
as an in-process OLAP database manager.
It takes some re-writing of Python code into SQL (or utilize the Relational API or another library such as Ibis Project), but can play nicely with pandas
and pyarrow
.
Speaking of Arrow 🏹, it seems to be efficient and growing in popularity and adoption.
streamlit
🎈 utilizes it to simplify objects in protobufs between browser and server.
pandas
🐼 has further integrations on their roadmap.
polars
🐻❄️ uses it to power their Rust-written DataFrame library.
This post explores an example streamlit
app that utilizes some pandas
and numpy
functions such as read_csv
, average
, and DataFrame slicing.
Using pyarrow
to load data gives a speedup over the default pandas
engine.
Using duckdb
to generate new views of data also speeds up difficult computations.
It also touches on the power of this combination for processing larger than memory datasets efficiently on a single machine.
pd.read_csv(
"uber-raw-data-sep14.csv.gz",
# nrows=100000, # approx. 10% of data
engine='pyarrow',
names=[
"date/time",
"lat",
"lon",
# "CONST"
], # specify names directly since they don't change
skiprows=1, # don't read header since names specified directly
# usecols=[1, 2], # doesn't load last column, constant value "B02512"
parse_dates=[
"date/time"
], # set as datetime instead of converting after the fact
# # infer_datetime_format=True # Unsupported for pyarrow
date_parser=lambda x: pd.to_datetime(x)
)
pd.read_csv(
"uber-raw-data-sep14.csv.gz",
# nrows=100000, # approx. 10% of data
engine='pyarrow',
# names=[
# "date/time",
# "lat",
# "lon",
# "CONST"
# ], # specify names directly since they don't change
# skiprows=1, # don't read header since names specified directly
usecols=[0,1], # doesn't load last column, constant value "B02512"
# parse_dates=[
# "date/time"
# ], # set as datetime instead of converting after the fact
# # infer_datetime_format=True # Unsupported for pyarrow
# date_parser=lambda x: pd.to_datetime(x)
).info()