In Pandas, PyArrow, fastparquet, AWS Data Wrangler, PySpark and Dask.
Introduction
I have recently gotten more familiar with how to work with Parquet datasets across the six major tools used to read and write from Parquet in the Python ecosystem: Pandas, PyArrow, fastparquet, AWS Data Wrangler, PySpark and Dask. My work of late in algorithmic trading involves switching between these tools a lot and as I said I often mix up the APIs. I use Pandas and PyArrow for in-RAM computing and machine learning, PySpark for ETL, Dask for parallel computing with numpy.arrays
and AWS Data Wrangler with Pandas and Amazon S3. I’ve used fastparquet with pandas when its PyArrow engine has a problem, but this was my first time using it directly.
The very first thing I do when I work with a new columnar dataset of any size is to convert it to Parquet format… and yet I constantly forget the APIs for doing so as I work across different libraries and computing platforms. I’m tired of looking up these different tools and their APIs so I decided to write down instructions for all of them in one place. Starting as a Stack Overflow answer here and expanded into this post, I‘ve written an overview of the Parquet format plus a guide and cheatsheet for the Pythonic tools that use Parquet so that I (and hopefully you) never have to look for them ever again.
Parquet format is optimized in three main ways: columnar storage, columnar compression and data partitioning. The fourth way is by row groups, but I won’t cover those today as most tools don’t support associating keys with particular row groups without some hacking. Below I go over each of these optimizations and then show you how to take advantage of each of them using the popular Pythonic data tools.
Column-Oriented Storage and Compression
Human readable data formats like CSV, JSON as well as most common transactional SQL databases are stored in rows. As you scroll down lines in a row-oriented file the columns are laid out in a format-specific way across the line. Text compresses quite well these days, so you can get away with quite a lot of computing using these formats. At some point, however, as the size of your data enters the gigabyte range loading and writing data on a single machine grind to a halt and take forever. This becomes a major hindrance to data science and machine learning engineering, which is inherently iterative. Long iteration time is a first-order roadblock to the efficient programmer. Something must be done!
Enter column-oriented data formats. These formats store each column of data together and can load them one at a time. This leads to two performance optimizations:
- You only pay for the columns you load. This is called columnar storage.
Letm
be the total number of columns in a file andn
be the number of columns requested by the user. Loadingn
columns results in justn/m
raw I/O volume. - The similarity of values within separate columns results in more efficient compression. This is called columnar compression.
Note theevent_type
column in both row and column-oriented formats in the diagram below. A compression algorithm will have a much easier time compressing repeats of the valueparty
in this column if they make up the entire value for that row, as in the column-oriented format. By contrast, the row-oriented format requires the compression algorithm to figure out repeats occur at some offset in the row which will vary based on the values in the previous columns. This is a much more difficult task.
Columnar storage combines with columnar compression to produce dramatic performance improvements for most applications that do not require every column in the file. I have often used PySpark to load CSV or JSON data that took a long time to load and converted it to Parquet format, after which using it with PySpark or even on a single computer in Pandas became quick and painless.
Columnar Partitioning
The other way Parquet makes data more efficient is by partitioning data on the unique values within one or more columns. Each unique value in a column-wise partitioning scheme is called a key. Using a format originally defined by Apache Hive, one folder is created for each key, with additional keys stored in sub-folders. This is called columnar partitioning, and it combines with columnar storage and columnar compression to dramatically improve I/O performance when loading part of a dataset corresponding to a partition key.
A Parquet dataset partitioned on gender and country would look like this:
path
└── to
└── table
├── gender=male
│ ├── …
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── …
Each unique value for the columns gender
and country
gets a folder and sub-folder, respectively. The leaves of these partition folder trees contain Parquet files using columnar storage and columnar compression, so any improvement in efficiency is on top of those optimizations!
Columnar partitioning optimizes loading data in the following way:
- Let
l
be all keys within a column. Letk
be the number of keys of interest. Loadingk
keys results in justk/l
raw I/O volume.
Row Group Partitioning
There is also row group partitioning if you need to further logically partition your data, but most tools only support specifying row group size and you have to do the `key →row group` lookup yourself. This limits its use. I recently used financial data that partitioned individual assets by their identifiers using row groups, but since the tools don’t support this it was painful to load multiple keys as you had to manually parse the Parquet metadata to match the key to its corresponding row group.
For more information on how the Parquet format works, check out the excellent PySpark Parquet documentation.
Parquet Partitions using Pandas & PyArrow
Pandas integrates with two libraries that support Parquet: PyArrow and fastparquet. They are specified via the engine
argument of pandas.read_parquet()
and pandas.DataFrame.to_parquet()
.
To store certain columns of your pandas.DataFrame
using data partitioning with Pandas and PyArrow, use the compression='snappy'
, engine='pyarrow'
and partition_cols=[]
arguments. Snappy compression is needed if you want to append data.
df.to_parquet(
path='analytics',
engine='pyarrow',
compression='snappy',
partition_cols=['event_name', 'event_category']
)
This lays the folder tree and files like so:
analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-1.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-1.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-1.snappy.parquet
analytics/event_name=OtherEvent/event_category=OtherCategory/part-1.snappy.parquet
Now that the Parquet files are laid out in this way, we can use partition column keys in a filter to limit the data we load. The pandas.read_parquet()
method accepts engine
, columns
and filters
arguments. The columns
argument takes advantage of columnar storage and column compression, loading only the files corresponding to those columns we ask for in an efficient manner. The filters
argument takes advantage of data partitioning by limiting the data loaded to certain folders corresponding to one or more keys in a partition column. Below we load the compressed event_name
and other_column
columns from the event_name
partition folder SomeEvent
.
df = pd.read_parquet(
path='analytics',
engine='pyarrow',
columns=['event_name', 'other_column'],
filters=[('event_name', '=', 'SomeEvent')]
)
Reading Parquet Partitions using PyArrow
PyArrow has its own API you can use directly, which is a good idea if using Pandas directly results in errors. To load records from a one or more partitions of a Parquet dataset using PyArrow based on their partition keys, we create an instance of the pyarrow.parquet.ParquetDataset
using the filters
argument with a tuple filter inside of a list (more on this below).
ParquetDatasets
beget Tables
which beget pandas.DataFrames
. To convert certain columns of this ParquetDataset
into a pyarrow.Table
we use ParquetDataset.to_table(columns=[])
. to_table()
gets its arguments from the scan()
method. This is followed by to_pandas()
to create a pandas.DataFrame
. Don’t worry, the I/O only happens lazily at the end.
Here we load the columns event_name
and other_column
from within the Parquet partition on S3 corresponding to the event_name
value of SomeEvent
from the analytics
. Both to_table()
and to_pandas()
have a use_threads
parameter you should use to accelerate performance.
import pyarrow.parquet as pq
import s3fsfs = s3fs.S3FileSystem()dataset = pq.ParquetDataset(
's3://analytics',
filesystem=fs,
filters=[('event_name', '=', 'SomeEvent')],
use_threads=True
)
df = dataset.to_table(
columns=['event_name', 'other_column'],
use_threads=True
).to_pandas()
You can see that the use of threads as above results in many threads reading from S3 concurrently to my home network below.
You can load a single file or local folder directly into apyarrow.Table
using pyarrow.parquet.read_table()
, but this doesn’t support S3 yet.
import pyarrow.parquet as pqdf = pq.read_table(
path='analytics.parquet',
columns=['event_name', 'other_column']
).to_pandas()
PyArrow Boolean Partition Filtering
The documentation for partition filtering via the filters
argument below is rather complicated, but it boils down to this: nest tuples
within a list
for OR and within an outer list
for AND.
filters (List[Tuple] or List[List[Tuple]] or None (default))Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported.Predicates are expressed in disjunctive normal form (DNF), like [[('x', '=', 0), ...], ...]. DNF allows arbitrary boolean logical combinations of single column predicates. The innermost tuples each describe a single column predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple column predicate. Finally, the most outer list combines these filters as a disjunction (OR).Predicates may also be passed as List[Tuple]. This form is interpreted as a single conjunction. To express OR in predicates, one must use the (preferred) List[List[Tuple]] notation.
To use both partition keys to grab records corresponding to the event_name
key SomeEvent
and its sub-partition event_category
key SomeCategory
we use boolean AND logic - a single list of two filter tuples.
dataset = pq.ParquetDataset(
's3://analytics',
filesystem=fs,
filters=[
('event_name', '=', 'SomeEvent'),
('event_category', '=', 'SomeCategory')
]
)
df = dataset.to_table(
columns=['event_name', 'other_column']
).to_pandas()
To load records from both the SomeEvent
and OtherEvent
keys of the event_name
partition we use boolean OR logic - nesting the filter tuples
in their own AND inner lists within an outer OR list.
dataset = pq.ParquetDataset(
's3://analytics',
filesystem=fs,
validate_schema=False,
filters=[
[('event_name', '=', 'SomeEvent')],
[('event_name', '=', 'OtherEvent')]
]
)
df = dataset.to_table(
columns=['event_name', 'other_column']
).to_pandas()
Writing Parquet Datasets with PyArrow
PyArrow writes Parquet datasets using pyarrow.parquet.write_table()
.
import pyarrow
import pyarrow.parquet as pqtable = pyarrow.Table.from_pandas(df)
pq.write_to_dataset(
table,
'analytics',
partition_cols=['event_name', 'other_column'],
use_legacy_dataset=False
)
For writing Parquet datasets to Amazon S3 with PyArrow you need to use the s3fs
package class s3fs.S3Filesystem
(which you can configure with credentials via the key
and secret
options if you need to, or it can use ~/.aws/credentials
):
import pyarrow
import pyarrow.parquet as pq
import s3fss3 = s3fs.S3FileSystem()table = pyarrow.Table.from_pandas(df)
pq.write_to_dataset(
table,
's3://analytics',
partition_cols=['event_name', 'other_column'],
use_legacy_dataset=False,
filesystem=s3
)
Parquet Partitions on S3 with AWS Data Wrangler
The easiest way to work with partitioned Parquet datasets on Amazon S3 using Pandas is with AWS Data Wrangler via the awswrangler
PyPi package via the awswrangler.s3.to_parquet()
and awswrangler.s3.read_parquet()
methods. AWS provides excellent examples in this notebook. Note that Wrangler is powered by PyArrow, but offers a simple interface with great features.
To write partitioned data to S3, set dataset=True
and partition_columns=[]
. You will want to set use_threads=True
to improve performance.
import awswrangler as wrwr.s3.to_parquet(
df=df,
path='s3://analytics',
dataset=True,
partition_cols=['event_name', 'event_category'],
use_threads=True,
compression='snappy',
mode='overwrite'
)
Reading Parquet data with partition filtering works differently than with PyArrow. With awswrangler
you use functions to filter to certain partition keys.
df = wr.s3.read_parquet(
path='s3://analytics',
dataset=True,
columns=['event_name', 'other_column'],
partition_filter=lambda x: x['event_name'] == 'SomeEvent',
use_threads=True
)
Note that in either method you can pass in your own boto3_session
if you need to authenticate or set other S3 options.
Parquet Partitions with fastparquet
Fastparquet is a Parquet library created by the people that brought us Dask, a wonderful distributed computing engine I’ll talk about below. I hadn’t used FastParquet directly before writing this post, and I was excited to try it. To write data from a pandas DataFrame in Parquet format, use fastparquet.write
.
import fastparquetfastparquet.write(
df,
compression='SNAPPY',
partition_on=['event_name', 'event_category']
)
To load certain columns of a partitioned collection you use fastparquet.ParquetFile
and ParquetFile.to_pandas()
. ParquetFile
won’t take a directory name as the path argument so you will have to walk the directory path of your collection and extract all the Parquet filenames. Then you supply the root directory as an argument and FastParquet can read your partition scheme. Tuple filters work just like PyArrow.
import os
from glob import glob
import fastparquet# Walk the directory and find all the parquet files within
parquet_root = 'analytics'
parquet_files = [y for x in os.walk(parquet_root) for y in glob(os.path.join(x[0], '*.parquet'))]# The root argument lets it know where to look for partitions
pf = fastparquet.ParquetFile(parquet_files, root=parquet_root)# Now we convert to pd.DataFrame specifying columns and filters
df = pf.to_pandas(
columns=['event_name', 'other_column'],
filters=('event_name', '=', 'SomeEvent')
)
That’s it! Ultimately I couldn’t get FastParquet to work because my data was laboriously compressed by PySpark using snappy compression, which fastparquet does not support reading. I’ve no doubt it works, however, as I’ve used it many times in Pandas via the engine='fastparquet'
argument whenever the PyArrow engine has a bug :)
Parquet Partitions with PySpark
There is a hard limit to the size of data you can process on one machine using Pandas. Beyond that limit you’re looking at using tools like PySpark or Dask. As a Hadoop evangelist I learned to think in map/reduce/iterate and I’m fluent in PySpark, so I use it often. PySpark uses the pyspark.sql.DataFrame
API to work with Parquet datasets. To create a partitioned Parquet dataset from a DataFrame
use the pyspark.sql.DataFrameWriter
class normally accessed via a DataFrame
's write
property via the parquet()
method and its partitionBy=[]
argument.
df.write.mode('overwrite').parquet(
path='s3://analytics',
partitionBy=['event_type', 'event_category'],
compression='snappy'
)
To read this partitioned Parquet dataset back in PySpark use pyspark.sql.DataFrameReader.read_parquet()
, usually accessed via the SparkSession.read
property. Chain the pyspark.sql.DataFrame.select()
method to select certain columns and the pyspark.sql.DataFrame.filter()
method to filter to certain partitions.
import pyspark.sql.functions as F
from pyspark.sql import SparkSessionspark = SparkSession \
.builder \
.appName('Analytics Application') \
.getOrCreate()df = spark.read.parquet('s3://analytics') \
.select('event_type', 'other_column') \
.filter(F.column('event_type') == 'SomeEvent')
You don’t need to tell Spark anything about Parquet optimizations, it just figures out how to take advantage of columnar storage, columnar compression and data partitioning all on its own. Pretty cool, eh?
Parquet Partitions with Dask
Dask is the distributed computing framework for Python you’ll want to use if you need to move around numpy.arrays
— which happens a lot in machine learning or GPU computing in general (see: RAPIDS). This is something that PySpark simply cannot do and the reason it has its own independent toolset for anything to do with machine learning. To adopt PySpark for your machine learning pipelines you have to adopt Spark ML (MLlib). Not so for Dask! You can use the standard Python tools. Before I found HuggingFace Tokenizers (which is so fast one Rust pid will do) I used Dask to tokenize data in parallel. I’ve also used it in search applications for bulk encoding documents in a large corpus using fine-tuned BERT and Sentence-BERT models.
I struggled with Dask during the early days, but I’ve come to love it since I started running my own workers (you shouldn’t have to, I started out in QA automation and consequently break things at an alarming rate). If you’re using Dask it is probably to use one or more machines to process datasets in parallel, so you’ll want to load Parquet files with Dask’s own APIs rather than using Pandas and then converting to a dask.dataframe.DataFrame
.
There are excellent docs on reading and writing Dask DataFrames. You do so via dask.dataframe.read_parquet()
and dask.dataframe.to_parquet()
. To read a Dask DataFrame from Amazon S3, supply the path, a lambda filter, any storage options and the number of threads to use. You can pick between fastparquet and PyArrow engines. read_parquet()
returns as many partitions as there are Parquet files, so keep in mind that you may need to repartition()
once you load to make use of all your computer(s)’ cores.
import dask.dataframe as dd
import s3fs
from dask.distributed import Clientclient = Client('127.0.0.1:8786')# Setup AWS configuration and credentials
storage_options = {
"client_kwargs": {
"region_name": "us-east-1",
},
"key": aws_access_key_id,
"secret": aws_secret_access_key
}ddf = dask.dataframe.read_parquet(
path='s3://analytics',
columns=['event_name', 'other_column'],
filter=lambda x: x['event_name'] in TICKERS,
storage_options=storage_options,
engine='pyarrow',
nthreads=8,
)
To write immediately write a Dask DataFrame to partitioned Parquet format dask.dataframe.to_parquet()
. Note that Dask will write one file per partition, so again you may want to repartition()
to as many files as you’d like to read in parallel, keeping in mind how many partition keys your partition columns have as each will have its own directory.
import dask.dataframe as dd
import s3fsdask.dataframe.to_parquet(
ddf,
's3://analytics',
compression='snappy',
partition_on=['event_name', 'event_type'],
compute=True,
)
Conclusion
Whew, that’s it! We’ve covered all the ways you can read and write Parquet datasets in Python using columnar storage, columnar compression and data partitioning. Used together, these three optimizations provide near random access of data, which can dramatically improve access speeds.
Hopefully this helps you work with Parquet to be much more productive :) If no one else reads this post, I know that I will numerous times over the years as I cross APIs and get mixed up about APIs and syntax.
Comments
Post a Comment