Parquet2
Parquet2 is a rust library to interact with the parquet format, welcome to its guide!
This guide describes on how to efficiently and safely read and write to and from parquet. Before starting, there are two concepts to introduce in the context of this guide:
- IO-bound operations: perform either disk reads or network calls (e.g. s3)
- CPU-bound operations: perform compute
In this guide, "read", "write" and "seek" correspond to IO-bound operations, "decompress", "compress", "deserialize", etc. are CPU-bound.
Generally, IO-bound operations are parallelizable with green threads, while CPU-bound operations are not.
Metadata
The starting point of reading a parquet file is reading its
metadata (at the end of the file).
To do so, we offer two functions for sync
and async
:
Sync
parquet2::read::read_metadata
for sync
reads:
use std::env;
let args: Vec<String> = env::args().collect();
let path = &args[1];
use parquet2::read::read_metadata;
let mut reader = std::fs::File::open(path)?;
let metadata = read_metadata(&mut reader)?;
println!("{:#?}", metadata);
Async
and parquet2::read::read_metadata_async
, for async reads
(using tokio::fs
as example):
use async_compat::Compat; use parquet2::{error::Result, read::read_metadata_async}; use tokio::fs; #[tokio::main] async fn main() -> Result<()> { let mut reader = Compat::new(fs::File::open("path.parquet").await?); let metadata = read_metadata_async(&mut reader).await?; // metadata println!("{:#?}", metadata); Ok(()) }
In both cases, metadata: FileMetaData
is the file's metadata.
Columns, Row Groups, Columns chunks and Pages
At this point, it is important to give a small introduction to the format itself. The metadata does not contain any data. Instead, the metadata contains the necessary information to read, decompress, decode and deserialize data. Generally:
- a file has a schema with columns and data
- data in the file is divided in row groups
- each row group contains column chunks
- each column chunk contains pages
- each page contains multiple values
each of the entities above has associated metadata. Except for the pages,
all this metadata is already available in the FileMetaData
.
Here we will focus on a single column to show how we can read it.
We access the metadata of a column chunk via
let row_group = 0;
let column = 0;
let columns = metadata.row_groups[row_group].columns();
let column_metadata = &columns[column];
From this, we can produce an iterator of compressed pages (sync),
parquet2::read::get_page_iterator
or a stream (async) of compressed
pages, parquet2::read::get_page_stream
:
use parquet2::read::get_page_iterator;
let pages = get_page_iterator(column_metadata, &mut reader, None, vec![], 1024 * 1024)?;
in both cases, they yield individual CompressedDataPage
s. Note that these
pages do hold values and own potentially large chunks of (compressed) memory.
At this point, we are missing 3 steps: decompress, decode and deserialize.
Decompression is done via decompress
:
let mut decompress_buffer = vec![];
let mut dict = None;
for maybe_page in pages {
let page = maybe_page?;
let page = parquet2::read::decompress(page, &mut decompress_buffer)?;
match page {
Page::Dict(page) => {
// the first page may be a dictionary page, which needs to be deserialized
// depending on your target in-memory format, you may want to deserialize
// the values differently...
// let page = deserialize_dict(&page)?;
dict = Some(page);
}
Page::Data(page) => {
let _array = deserialize(&page, dict.as_ref())?;
}
}
}
Decoding and deserialization is usually done in the same step, as follows:
use parquet2::encoding::Encoding;
use parquet2::page::{split_buffer, DataPage, DictPage, Page};
use parquet2::schema::types::PhysicalType;
fn deserialize(page: &DataPage, dict: Option<&DictPage>) -> Result<()> {
// split the data buffer in repetition levels, definition levels and values
let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page)?;
// decode and deserialize.
match (
page.descriptor.primitive_type.physical_type,
page.encoding(),
dict,
) {
(
PhysicalType::Int32,
Encoding::PlainDictionary | Encoding::RleDictionary,
Some(_dict_page),
) => {
// plain encoded page with a dictionary
// _dict_page can be downcasted based on the descriptor's physical type
todo!()
}
(PhysicalType::Int32, Encoding::Plain, None) => {
// plain encoded page
todo!()
}
_ => todo!(),
}
}
the details of the todo!
are highly specific to the target in-memory format to use.
Thus, here we only describe how to decompose the page buffer in its individual
components and what you need to worry about.
For example, reading to Apache Arrow often does not require decoding the
definition levels, as they have the same representation as in Arrow, but do require
deserialization of some values, as e.g. arrow supports unsigned integers while
parquet only accepts (potentially encoded) i32
and i64
integers.
Refer to the integration tests's implementation for deserialization to a
simple in-memory format, and arrow2
for an implementation to the Apache Arrow format.
Row group statistics
The metadata of row groups can contain row group statistics that can be used to pushdown filter operations.
The statistics are encoded based on the physical type of the column and
are represented via trait objects of the trait Statistics
,
which can be downcasted via its Statistics::physical_type()
:
if let Some(maybe_stats) = column_metadata.statistics() {
let stats = maybe_stats?;
use parquet2::statistics::PrimitiveStatistics;
match stats.physical_type() {
PhysicalType::Int32 => {
let stats = stats
.as_any()
.downcast_ref::<PrimitiveStatistics<i32>>()
.unwrap();
let _min: i32 = stats.min_value.unwrap();
let _max: i32 = stats.max_value.unwrap();
let _null_count: i64 = stats.null_count.unwrap();
}
PhysicalType::Int64 => {
let stats = stats
.as_any()
.downcast_ref::<PrimitiveStatistics<i64>>()
.unwrap();
let _min: i64 = stats.min_value.unwrap();
let _max: i64 = stats.max_value.unwrap();
let _null_count: i64 = stats.null_count.unwrap();
}
_ => todo!(),
}
}
Bloom filters
The column metadata may contain bloom filter bitsets that can be used to pushdown filter operations to row groups.
This crate offers the necessary functionality to check whether an item is not in a column chunk:
let mut bitset = vec![];
bloom_filter::read(column_metadata, &mut reader, &mut bitset)?;
if !bitset.is_empty() {
// there is a bitset, we can use it to check if elements are in the column chunk
// assume that our query engine had resulted in the filter `"column 0" == 100i64` (it also verified that column 0 is i64 in parquet)
let value = 100i64;
// we hash this value
let hash = bloom_filter::hash_native(value);
// and check if the hash is in the bitset.
let _in_set = bloom_filter::is_in_set(&bitset, hash);
// if not (false), we could skip this entire row group, because no item hits the filter
// this can naturally be applied over multiple columns.
// if yes (true), the item _may_ be in the row group, and we usually can't skip it.
}
Column and page indexes
The column metadata may contain column and page indexes that can be used to push down filters when reading (IO) pages.
This crate offers the necessary functionality to check whether an item is not in a column chunk:
// read the column indexes of every column
use parquet2::read;
let index = read::read_columns_indexes(&mut reader, columns)?;
// these are the minimum and maximum within each page, which can be used
// to skip pages.
println!("{index:?}");
// read the offset indexes containing page locations of every column
let pages = read::read_pages_locations(&mut reader, columns)?;
println!("{pages:?}");
Sidecar
When writing multiple parquet files, it is common to have a "sidecar" metadata file containing the combined metadata of all files, including statistics.
This crate supports this use-case, as shown in the example below:
use parquet2::{ error::Error, metadata::SchemaDescriptor, schema::types::{ParquetType, PhysicalType}, write::{write_metadata_sidecar, FileWriter, Version, WriteOptions}, }; fn main() -> Result<(), Error> { // say we have 10 files with the same schema: let schema = SchemaDescriptor::new( "schema".to_string(), vec![ParquetType::from_physical( "c1".to_string(), PhysicalType::Int32, )], ); // we can collect their metadata after they are written let mut metadatas = vec![]; for i in 0..10 { let relative_path = format!("part-{i}.parquet"); let writer = std::io::Cursor::new(vec![]); let mut writer = FileWriter::new( writer, schema.clone(), WriteOptions { write_statistics: true, version: Version::V2, }, None, ); // we write row groups to it // writer.write(row_group) // and the footer writer.end(None)?; let (_, mut metadata) = writer.into_inner_and_metadata(); // once done, we write the relative path to the column chunks metadata.row_groups.iter_mut().for_each(|row_group| { row_group .columns .iter_mut() .for_each(|column| column.file_path = Some(relative_path.clone())) }); // and collect the metadata metadatas.push(metadata); } // we can then merge their row groups let first = metadatas.pop().unwrap(); let sidecar = metadatas.into_iter().fold(first, |mut acc, metadata| { acc.row_groups.extend(metadata.row_groups.into_iter()); acc }); // and write the metadata on a separate file let mut writer = std::io::Cursor::new(vec![]); write_metadata_sidecar(&mut writer, &sidecar)?; Ok(()) }