Parquet2

Parquet2 is a rust library to interact with the parquet format, welcome to its guide!

This guide describes on how to efficiently and safely read and write to and from parquet. Before starting, there are two concepts to introduce in the context of this guide:

  • IO-bound operations: perform either disk reads or network calls (e.g. s3)
  • CPU-bound operations: perform compute

In this guide, "read", "write" and "seek" correspond to IO-bound operations, "decompress", "compress", "deserialize", etc. are CPU-bound.

Generally, IO-bound operations are parallelizable with green threads, while CPU-bound operations are not.

Metadata

The starting point of reading a parquet file is reading its metadata (at the end of the file). To do so, we offer two functions for sync and async:

Sync

parquet2::read::read_metadata for sync reads:

    use std::env;
    let args: Vec<String> = env::args().collect();

    let path = &args[1];

    use parquet2::read::read_metadata;
    let mut reader = std::fs::File::open(path)?;
    let metadata = read_metadata(&mut reader)?;

    println!("{:#?}", metadata);

Async

and parquet2::read::read_metadata_async, for async reads (using tokio::fs as example):

use async_compat::Compat;
use parquet2::{error::Result, read::read_metadata_async};
use tokio::fs;

#[tokio::main]
async fn main() -> Result<()> {
    let mut reader = Compat::new(fs::File::open("path.parquet").await?);
    let metadata = read_metadata_async(&mut reader).await?;

    // metadata
    println!("{:#?}", metadata);
    Ok(())
}

In both cases, metadata: FileMetaData is the file's metadata.

Columns, Row Groups, Columns chunks and Pages

At this point, it is important to give a small introduction to the format itself. The metadata does not contain any data. Instead, the metadata contains the necessary information to read, decompress, decode and deserialize data. Generally:

  • a file has a schema with columns and data
  • data in the file is divided in row groups
  • each row group contains column chunks
  • each column chunk contains pages
  • each page contains multiple values

each of the entities above has associated metadata. Except for the pages, all this metadata is already available in the FileMetaData. Here we will focus on a single column to show how we can read it.

We access the metadata of a column chunk via

    let row_group = 0;
    let column = 0;
    let columns = metadata.row_groups[row_group].columns();
    let column_metadata = &columns[column];

From this, we can produce an iterator of compressed pages (sync), parquet2::read::get_page_iterator or a stream (async) of compressed pages, parquet2::read::get_page_stream:

    use parquet2::read::get_page_iterator;
    let pages = get_page_iterator(column_metadata, &mut reader, None, vec![], 1024 * 1024)?;

in both cases, they yield individual CompressedDataPages. Note that these pages do hold values and own potentially large chunks of (compressed) memory.

At this point, we are missing 3 steps: decompress, decode and deserialize. Decompression is done via decompress:

    let mut decompress_buffer = vec![];
    let mut dict = None;
    for maybe_page in pages {
        let page = maybe_page?;
        let page = parquet2::read::decompress(page, &mut decompress_buffer)?;

        match page {
            Page::Dict(page) => {
                // the first page may be a dictionary page, which needs to be deserialized
                // depending on your target in-memory format, you may want to deserialize
                // the values differently...
                // let page = deserialize_dict(&page)?;
                dict = Some(page);
            }
            Page::Data(page) => {
                let _array = deserialize(&page, dict.as_ref())?;
            }
        }
    }

Decoding and deserialization is usually done in the same step, as follows:

use parquet2::encoding::Encoding;
use parquet2::page::{split_buffer, DataPage, DictPage, Page};
use parquet2::schema::types::PhysicalType;

fn deserialize(page: &DataPage, dict: Option<&DictPage>) -> Result<()> {
    // split the data buffer in repetition levels, definition levels and values
    let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page)?;

    // decode and deserialize.
    match (
        page.descriptor.primitive_type.physical_type,
        page.encoding(),
        dict,
    ) {
        (
            PhysicalType::Int32,
            Encoding::PlainDictionary | Encoding::RleDictionary,
            Some(_dict_page),
        ) => {
            // plain encoded page with a dictionary
            // _dict_page can be downcasted based on the descriptor's physical type
            todo!()
        }
        (PhysicalType::Int32, Encoding::Plain, None) => {
            // plain encoded page
            todo!()
        }
        _ => todo!(),
    }
}

the details of the todo! are highly specific to the target in-memory format to use. Thus, here we only describe how to decompose the page buffer in its individual components and what you need to worry about. For example, reading to Apache Arrow often does not require decoding the definition levels, as they have the same representation as in Arrow, but do require deserialization of some values, as e.g. arrow supports unsigned integers while parquet only accepts (potentially encoded) i32 and i64 integers. Refer to the integration tests's implementation for deserialization to a simple in-memory format, and arrow2 for an implementation to the Apache Arrow format.

Row group statistics

The metadata of row groups can contain row group statistics that can be used to pushdown filter operations.

The statistics are encoded based on the physical type of the column and are represented via trait objects of the trait Statistics, which can be downcasted via its Statistics::physical_type():

    if let Some(maybe_stats) = column_metadata.statistics() {
        let stats = maybe_stats?;
        use parquet2::statistics::PrimitiveStatistics;
        match stats.physical_type() {
            PhysicalType::Int32 => {
                let stats = stats
                    .as_any()
                    .downcast_ref::<PrimitiveStatistics<i32>>()
                    .unwrap();
                let _min: i32 = stats.min_value.unwrap();
                let _max: i32 = stats.max_value.unwrap();
                let _null_count: i64 = stats.null_count.unwrap();
            }
            PhysicalType::Int64 => {
                let stats = stats
                    .as_any()
                    .downcast_ref::<PrimitiveStatistics<i64>>()
                    .unwrap();
                let _min: i64 = stats.min_value.unwrap();
                let _max: i64 = stats.max_value.unwrap();
                let _null_count: i64 = stats.null_count.unwrap();
            }
            _ => todo!(),
        }
    }

Bloom filters

The column metadata may contain bloom filter bitsets that can be used to pushdown filter operations to row groups.

This crate offers the necessary functionality to check whether an item is not in a column chunk:

    let mut bitset = vec![];
    bloom_filter::read(column_metadata, &mut reader, &mut bitset)?;
    if !bitset.is_empty() {
        // there is a bitset, we can use it to check if elements are in the column chunk

        // assume that our query engine had resulted in the filter `"column 0" == 100i64` (it also verified that column 0 is i64 in parquet)
        let value = 100i64;

        // we hash this value
        let hash = bloom_filter::hash_native(value);

        // and check if the hash is in the bitset.
        let _in_set = bloom_filter::is_in_set(&bitset, hash);
        // if not (false), we could skip this entire row group, because no item hits the filter
        // this can naturally be applied over multiple columns.
        // if yes (true), the item _may_ be in the row group, and we usually can't skip it.
    }

Column and page indexes

The column metadata may contain column and page indexes that can be used to push down filters when reading (IO) pages.

This crate offers the necessary functionality to check whether an item is not in a column chunk:

    // read the column indexes of every column
    use parquet2::read;
    let index = read::read_columns_indexes(&mut reader, columns)?;
    // these are the minimum and maximum within each page, which can be used
    // to skip pages.
    println!("{index:?}");

    // read the offset indexes containing page locations of every column
    let pages = read::read_pages_locations(&mut reader, columns)?;
    println!("{pages:?}");

Sidecar

When writing multiple parquet files, it is common to have a "sidecar" metadata file containing the combined metadata of all files, including statistics.

This crate supports this use-case, as shown in the example below:

use parquet2::{
    error::Error,
    metadata::SchemaDescriptor,
    schema::types::{ParquetType, PhysicalType},
    write::{write_metadata_sidecar, FileWriter, Version, WriteOptions},
};

fn main() -> Result<(), Error> {
    // say we have 10 files with the same schema:
    let schema = SchemaDescriptor::new(
        "schema".to_string(),
        vec![ParquetType::from_physical(
            "c1".to_string(),
            PhysicalType::Int32,
        )],
    );

    // we can collect their metadata after they are written
    let mut metadatas = vec![];
    for i in 0..10 {
        let relative_path = format!("part-{i}.parquet");
        let writer = std::io::Cursor::new(vec![]);
        let mut writer = FileWriter::new(
            writer,
            schema.clone(),
            WriteOptions {
                write_statistics: true,
                version: Version::V2,
            },
            None,
        );

        // we write row groups to it
        // writer.write(row_group)

        // and the footer
        writer.end(None)?;
        let (_, mut metadata) = writer.into_inner_and_metadata();

        // once done, we write the relative path to the column chunks
        metadata.row_groups.iter_mut().for_each(|row_group| {
            row_group
                .columns
                .iter_mut()
                .for_each(|column| column.file_path = Some(relative_path.clone()))
        });
        // and collect the metadata
        metadatas.push(metadata);
    }

    // we can then merge their row groups
    let first = metadatas.pop().unwrap();
    let sidecar = metadatas.into_iter().fold(first, |mut acc, metadata| {
        acc.row_groups.extend(metadata.row_groups.into_iter());
        acc
    });

    // and write the metadata on a separate file
    let mut writer = std::io::Cursor::new(vec![]);
    write_metadata_sidecar(&mut writer, &sidecar)?;

    Ok(())
}