Skip to main content

Python and Parquet Performance

In Pandas, PyArrow, fastparquet, AWS Data Wrangler, PySpark and Dask.



This post outlines how to use all common Python libraries to read and write Parquet format while taking advantage of columnar storagecolumnar compression and data partitioning. Used together, these three optimizations can dramatically accelerate I/O for your Python applications compared to CSV, JSON, HDF or other row-based formats. Parquet makes applications possible that are simply impossible using a text format like JSON or CSV.

Introduction

I have recently gotten more familiar with how to work with Parquet datasets across the six major tools used to read and write from Parquet in the Python ecosystem: PandasPyArrowfastparquetAWS Data WranglerPySpark and Dask. My work of late in algorithmic trading involves switching between these tools a lot and as I said I often mix up the APIs. I use Pandas and PyArrow for in-RAM computing and machine learning, PySpark for ETL, Dask for parallel computing with numpy.arrays and AWS Data Wrangler with Pandas and Amazon S3. I’ve used fastparquet with pandas when its PyArrow engine has a problem, but this was my first time using it directly.

The very first thing I do when I work with a new columnar dataset of any size is to convert it to Parquet format… and yet I constantly forget the APIs for doing so as I work across different libraries and computing platforms. I’m tired of looking up these different tools and their APIs so I decided to write down instructions for all of them in one place. Starting as a Stack Overflow answer here and expanded into this post, I‘ve written an overview of the Parquet format plus a guide and cheatsheet for the Pythonic tools that use Parquet so that I (and hopefully you) never have to look for them ever again.

Parquet format is optimized in three main ways: columnar storagecolumnar compression and data partitioning. The fourth way is by row groups, but I won’t cover those today as most tools don’t support associating keys with particular row groups without some hacking. Below I go over each of these optimizations and then show you how to take advantage of each of them using the popular Pythonic data tools.

Column-Oriented Storage and Compression

Human readable data formats like CSV, JSON as well as most common transactional SQL databases are stored in rows. As you scroll down lines in a row-oriented file the columns are laid out in a format-specific way across the line. Text compresses quite well these days, so you can get away with quite a lot of computing using these formats. At some point, however, as the size of your data enters the gigabyte range loading and writing data on a single machine grind to a halt and take forever. This becomes a major hindrance to data science and machine learning engineering, which is inherently iterative. Long iteration time is a first-order roadblock to the efficient programmer. Something must be done!

Enter column-oriented data formats. These formats store each column of data together and can load them one at a time. This leads to two performance optimizations:

  1. You only pay for the columns you load. This is called columnar storage.

    Let m be the total number of columns in a file and n be the number of columns requested by the user. Loading n columns results in justn/m raw I/O volume.
  2. The similarity of values within separate columns results in more efficient compression. This is called columnar compression.

    Note the event_type column in both row and column-oriented formats in the diagram below. A compression algorithm will have a much easier time compressing repeats of the value party in this column if they make up the entire value for that row, as in the column-oriented format. By contrast, the row-oriented format requires the compression algorithm to figure out repeats occur at some offset in the row which will vary based on the values in the previous columns. This is a much more difficult task.
The column-oriented storage format can load just the columns of interest. Within these columns, similar or repeated values such as ‘party’ within the ‘event_type’ column compress more efficiently.

Columnar storage combines with columnar compression to produce dramatic performance improvements for most applications that do not require every column in the file. I have often used PySpark to load CSV or JSON data that took a long time to load and converted it to Parquet format, after which using it with PySpark or even on a single computer in Pandas became quick and painless.

Columnar Partitioning

The other way Parquet makes data more efficient is by partitioning data on the unique values within one or more columns. Each unique value in a column-wise partitioning scheme is called a key. Using a format originally defined by Apache Hive, one folder is created for each key, with additional keys stored in sub-folders. This is called columnar partitioning, and it combines with columnar storage and columnar compression to dramatically improve I/O performance when loading part of a dataset corresponding to a partition key.

A Parquet dataset partitioned on gender and country would look like this:

path
└── to
└── table
├── gender=male
│ ├── …
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── …

Each unique value for the columns gender and country gets a folder and sub-folder, respectively. The leaves of these partition folder trees contain Parquet files using columnar storage and columnar compression, so any improvement in efficiency is on top of those optimizations!

Columnar partitioning optimizes loading data in the following way:

  1. Let l be all keys within a column. Let k be the number of keys of interest. Loading k keys results in justk/l raw I/O volume.

Row Group Partitioning

There is also row group partitioning if you need to further logically partition your data, but most tools only support specifying row group size and you have to do the `key →row group` lookup yourself. This limits its use. I recently used financial data that partitioned individual assets by their identifiers using row groups, but since the tools don’t support this it was painful to load multiple keys as you had to manually parse the Parquet metadata to match the key to its corresponding row group.

For more information on how the Parquet format works, check out the excellent PySpark Parquet documentation.

Parquet Partitions using Pandas & PyArrow

Pandas integrates with two libraries that support Parquet: PyArrow and fastparquet. They are specified via the engine argument of pandas.read_parquet() and pandas.DataFrame.to_parquet().

To store certain columns of your pandas.DataFrame using data partitioning with Pandas and PyArrow, use the compression='snappy'engine='pyarrow' and partition_cols=[] arguments. Snappy compression is needed if you want to append data.

df.to_parquet(
path='analytics',
engine='pyarrow',
compression='snappy',
partition_cols=['event_name', 'event_category']
)

This lays the folder tree and files like so:

analytics.xxx/event_name=SomeEvent/event_category=SomeCategory/part-1.snappy.parquet
analytics.xxx/event_name=SomeEvent/event_category=OtherCategory/part-1.snappy.parquet
analytics.xxx/event_name=OtherEvent/event_category=SomeCategory/part-1.snappy.parquet
analytics/event_name=OtherEvent/event_category=OtherCategory/part-1.snappy.parquet

Now that the Parquet files are laid out in this way, we can use partition column keys in a filter to limit the data we load. The pandas.read_parquet() method accepts enginecolumns and filters arguments. The columns argument takes advantage of columnar storage and column compression, loading only the files corresponding to those columns we ask for in an efficient manner. The filters argument takes advantage of data partitioning by limiting the data loaded to certain folders corresponding to one or more keys in a partition column. Below we load the compressed event_name and other_column columns from the event_name partition folder SomeEvent.

df = pd.read_parquet(
path='analytics',
engine='pyarrow',
columns=['event_name', 'other_column'],
filters=[('event_name', '=', 'SomeEvent')]

)

Reading Parquet Partitions using PyArrow

PyArrow has its own API you can use directly, which is a good idea if using Pandas directly results in errors. To load records from a one or more partitions of a Parquet dataset using PyArrow based on their partition keys, we create an instance of the pyarrow.parquet.ParquetDataset using the filters argument with a tuple filter inside of a list (more on this below).

ParquetDatasets beget Tables which beget pandas.DataFrames. To convert certain columns of this ParquetDataset into a pyarrow.Table we use ParquetDataset.to_table(columns=[])to_table() gets its arguments from the scan() method. This is followed by to_pandas() to create a pandas.DataFrame. Don’t worry, the I/O only happens lazily at the end.

Here we load the columns event_name and other_column from within the Parquet partition on S3 corresponding to the event_name value of SomeEvent from the analytics. Both to_table() and to_pandas() have a use_threads parameter you should use to accelerate performance.

import pyarrow.parquet as pq
import s3fs
fs = s3fs.S3FileSystem()dataset = pq.ParquetDataset(
's3://analytics',
filesystem=fs,
filters=[('event_name', '=', 'SomeEvent')],
use_threads=True
)
df = dataset.to_table(
columns=['event_name', 'other_column'],
use_threads=True
).to_pandas()

You can see that the use of threads as above results in many threads reading from S3 concurrently to my home network below.

You can load a single file or local folder directly into apyarrow.Table using pyarrow.parquet.read_table(), but this doesn’t support S3 yet.

import pyarrow.parquet as pqdf = pq.read_table(
path='analytics.parquet',
columns=['event_name', 'other_column']
).to_pandas()

PyArrow Boolean Partition Filtering

The documentation for partition filtering via the filters argument below is rather complicated, but it boils down to this: nest tuples within a list for OR and within an outer list for AND.

filters (List[Tuple] or List[List[Tuple]] or None (default))Rows which do not match the filter predicate will be removed from scanned data. Partition keys embedded in a nested directory structure will be exploited to avoid loading files at all if they contain no matching rows. If use_legacy_dataset is True, filters can only reference partition keys and only a hive-style directory structure is supported. When setting use_legacy_dataset to False, also within-file level filtering and different partitioning schemes are supported.Predicates are expressed in disjunctive normal form (DNF), like [[('x', '=', 0), ...], ...]. DNF allows arbitrary boolean logical combinations of single column predicates. The innermost tuples each describe a single column predicate. The list of inner predicates is interpreted as a conjunction (AND), forming a more selective and multiple column predicate. Finally, the most outer list combines these filters as a disjunction (OR).Predicates may also be passed as List[Tuple]. This form is interpreted as a single conjunction. To express OR in predicates, one must use the (preferred) List[List[Tuple]] notation.

To use both partition keys to grab records corresponding to the event_name key SomeEvent and its sub-partition event_category key SomeCategory we use boolean AND logic - a single list of two filter tuples.

dataset = pq.ParquetDataset(
's3://analytics',
filesystem=fs,
filters=[
('event_name', '=', 'SomeEvent'),
('event_category', '=', 'SomeCategory')
]

)
df = dataset.to_table(
columns=['event_name', 'other_column']
).to_pandas()

To load records from both the SomeEvent and OtherEvent keys of the event_name partition we use boolean OR logic - nesting the filter tuples in their own AND inner lists within an outer OR list.

dataset = pq.ParquetDataset(
's3://analytics',
filesystem=fs,
validate_schema=False,
filters=[
[('event_name', '=', 'SomeEvent')],
[('event_name', '=', 'OtherEvent')]
]

)
df = dataset.to_table(
columns=['event_name', 'other_column']
).to_pandas()

Writing Parquet Datasets with PyArrow

PyArrow writes Parquet datasets using pyarrow.parquet.write_table().

import pyarrow
import pyarrow.parquet as pq
table = pyarrow.Table.from_pandas(df)
pq.write_to_dataset(
table,
'analytics',
partition_cols=['event_name', 'other_column'],
use_legacy_dataset=False

)

For writing Parquet datasets to Amazon S3 with PyArrow you need to use the s3fs package class s3fs.S3Filesystem (which you can configure with credentials via the key and secret options if you need to, or it can use ~/.aws/credentials):

import pyarrow
import pyarrow.parquet as pq
import s3fs
s3 = s3fs.S3FileSystem()table = pyarrow.Table.from_pandas(df)
pq.write_to_dataset(
table,
's3://analytics',
partition_cols=['event_name', 'other_column'],
use_legacy_dataset=False,
filesystem=s3

)

Parquet Partitions on S3 with AWS Data Wrangler

The easiest way to work with partitioned Parquet datasets on Amazon S3 using Pandas is with AWS Data Wrangler via the awswrangler PyPi package via the awswrangler.s3.to_parquet() and awswrangler.s3.read_parquet() methods. AWS provides excellent examples in this notebook. Note that Wrangler is powered by PyArrow, but offers a simple interface with great features.

To write partitioned data to S3, set dataset=True and partition_columns=[]. You will want to set use_threads=True to improve performance.

import awswrangler as wrwr.s3.to_parquet(
df=df,
path='s3://analytics',
dataset=True,
partition_cols=['event_name', 'event_category'],
use_threads=True,
compression='snappy',
mode='overwrite'
)

Reading Parquet data with partition filtering works differently than with PyArrow. With awswrangler you use functions to filter to certain partition keys.

df = wr.s3.read_parquet(
path='s3://analytics',
dataset=True,
columns=['event_name', 'other_column'],
partition_filter=lambda x: x['event_name'] == 'SomeEvent',
use_threads=True
)

Note that in either method you can pass in your own boto3_session if you need to authenticate or set other S3 options.

Parquet Partitions with fastparquet

Fastparquet is a Parquet library created by the people that brought us Dask, a wonderful distributed computing engine I’ll talk about below. I hadn’t used FastParquet directly before writing this post, and I was excited to try it. To write data from a pandas DataFrame in Parquet format, use fastparquet.write.

import fastparquetfastparquet.write(
df,
compression='SNAPPY',
partition_on=['event_name', 'event_category']
)

To load certain columns of a partitioned collection you use fastparquet.ParquetFile and ParquetFile.to_pandas()ParquetFile won’t take a directory name as the path argument so you will have to walk the directory path of your collection and extract all the Parquet filenames. Then you supply the root directory as an argument and FastParquet can read your partition scheme. Tuple filters work just like PyArrow.

import os
from glob import glob
import fastparquet
# Walk the directory and find all the parquet files within
parquet_root = 'analytics'
parquet_files = [y for x in os.walk(parquet_root) for y in glob(os.path.join(x[0], '*.parquet'))]
# The root argument lets it know where to look for partitions
pf = fastparquet.ParquetFile(parquet_files, root=parquet_root)
# Now we convert to pd.DataFrame specifying columns and filters
df = pf.to_pandas(
columns=['event_name', 'other_column'],
filters=('event_name', '=', 'SomeEvent')

)

That’s it! Ultimately I couldn’t get FastParquet to work because my data was laboriously compressed by PySpark using snappy compression, which fastparquet does not support reading. I’ve no doubt it works, however, as I’ve used it many times in Pandas via the engine='fastparquet' argument whenever the PyArrow engine has a bug :)

Parquet Partitions with PySpark

There is a hard limit to the size of data you can process on one machine using Pandas. Beyond that limit you’re looking at using tools like PySpark or Dask. As a Hadoop evangelist I learned to think in map/reduce/iterate and I’m fluent in PySpark, so I use it often. PySpark uses the pyspark.sql.DataFrame API to work with Parquet datasets. To create a partitioned Parquet dataset from a DataFrame use the pyspark.sql.DataFrameWriter class normally accessed via a DataFrame's write property via the parquet() method and its partitionBy=[] argument.

df.write.mode('overwrite').parquet(
path='s3://analytics',
partitionBy=['event_type', 'event_category'],
compression='snappy'
)

To read this partitioned Parquet dataset back in PySpark use pyspark.sql.DataFrameReader.read_parquet(), usually accessed via the SparkSession.read property. Chain the pyspark.sql.DataFrame.select() method to select certain columns and the pyspark.sql.DataFrame.filter() method to filter to certain partitions.

import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('Analytics Application') \
.getOrCreate()
df = spark.read.parquet('s3://analytics') \
.select('event_type', 'other_column') \
.filter(F.column('event_type') == 'SomeEvent')

You don’t need to tell Spark anything about Parquet optimizations, it just figures out how to take advantage of columnar storagecolumnar compression and data partitioning all on its own. Pretty cool, eh?

Parquet Partitions with Dask

Dask is the distributed computing framework for Python you’ll want to use if you need to move around numpy.arrays — which happens a lot in machine learning or GPU computing in general (see: RAPIDS). This is something that PySpark simply cannot do and the reason it has its own independent toolset for anything to do with machine learning. To adopt PySpark for your machine learning pipelines you have to adopt Spark ML (MLlib). Not so for Dask! You can use the standard Python tools. Before I found HuggingFace Tokenizers (which is so fast one Rust pid will do) I used Dask to tokenize data in parallel. I’ve also used it in search applications for bulk encoding documents in a large corpus using fine-tuned BERT and Sentence-BERT models.

I struggled with Dask during the early days, but I’ve come to love it since I started running my own workers (you shouldn’t have to, I started out in QA automation and consequently break things at an alarming rate). If you’re using Dask it is probably to use one or more machines to process datasets in parallel, so you’ll want to load Parquet files with Dask’s own APIs rather than using Pandas and then converting to a dask.dataframe.DataFrame.

There are excellent docs on reading and writing Dask DataFrames. You do so via dask.dataframe.read_parquet() and dask.dataframe.to_parquet(). To read a Dask DataFrame from Amazon S3, supply the path, a lambda filter, any storage options and the number of threads to use. You can pick between fastparquet and PyArrow engines. read_parquet() returns as many partitions as there are Parquet files, so keep in mind that you may need to repartition() once you load to make use of all your computer(s)’ cores.

import dask.dataframe as dd
import s3fs
from dask.distributed import Client
client = Client('127.0.0.1:8786')# Setup AWS configuration and credentials
storage_options = {
"client_kwargs": {
"region_name": "us-east-1",
},
"key": aws_access_key_id,
"secret": aws_secret_access_key
}
ddf = dask.dataframe.read_parquet(
path='s3://analytics',
columns=['event_name', 'other_column'],
filter=lambda x: x['event_name'] in TICKERS,
storage_options=storage_options,
engine='pyarrow',
nthreads=8,

)

To write immediately write a Dask DataFrame to partitioned Parquet format dask.dataframe.to_parquet(). Note that Dask will write one file per partition, so again you may want to repartition() to as many files as you’d like to read in parallel, keeping in mind how many partition keys your partition columns have as each will have its own directory.

import dask.dataframe as dd
import s3fs
dask.dataframe.to_parquet(
ddf,
's3://analytics',
compression='snappy',
partition_on=['event_name', 'event_type'],
compute=True,

)

Conclusion

Whew, that’s it! We’ve covered all the ways you can read and write Parquet datasets in Python using columnar storagecolumnar compression and data partitioning. Used together, these three optimizations provide near random access of data, which can dramatically improve access speeds.

Hopefully this helps you work with Parquet to be much more productive :) If no one else reads this post, I know that I will numerous times over the years as I cross APIs and get mixed up about APIs and syntax.

Comments

Popular posts from this blog

Build Data Platform

I'd appreciate your likes and comments). Additionally, it will result in my lengthiest blog post to date. However, regardless of the length, it's a significant achievement that I'm eager to share with you. I'll refrain from delving into unnecessary details and get straight to the main points to prevent this post from turning into a 100-minute read :). As always, I'll strive to simplify everything to ensure even those who aren't tech-savvy can easily follow along. Why? Everything has a why, this project too. (DevOps for data engineering) and I needed to apply them in an end-to-end project. Of course, this project is not the best one out there, but it helps me to quickly iterate and make errors. (And it reflects the reality of Modern Data Engineering, with beautiful tool icons everywhere). End Goal The end goal of this project is to have a fully functional data platform/pipeline, that will refresh our analytics tables/dashboards daily. The whole infrastructu...

Kubernetes Configuration Provider to load data from Secrets and Config Maps

Using Kubernetes Configuration Provider to load data from Secrets and Config Maps When running Apache Kafka on Kubernetes, you will sooner or later probably need to use Config Maps or Secrets. Either to store something in them, or load them into your Kafka configuration. That is true regardless of whether you use Strimzi to manage your Apache Kafka cluster or something else. Kubernetes has its own way of using Secrets and Config Maps from Pods. But they might not be always sufficient. That is why in Strimzi, we created Kubernetes Configuration Provider for Apache Kafka which we will introduce in this blog post. Usually, when you need to use data from a Config Map or Secret in your Pod, you will either mount it as volume or map it to an environment variable. Both methods are configured in the spec section or the Pod resource or in the spec.template.spec section when using higher level resources such as Deployments or StatefulSets. When mounted as a volume, the contents of the Secr...