Read parquet

When compiled with feature io_parquet, this crate can be used to read parquet files to arrow. It makes minimal assumptions on how you to decompose CPU and IO intensive tasks.

First, some notation:

  • page: part of a column (e.g. similar to a slice of an Array)
  • column chunk: composed of multiple pages (similar to an Array)
  • row group: a group of columns with the same length (similar to a Chunk)

Here is how to read a single column chunk from a single row group:

use std::fs::File;
use std::time::SystemTime;

use arrow2::error::Error;
use arrow2::io::parquet::read;

fn main() -> Result<(), Error> {
    // say we have a file
    use std::env;
    let args: Vec<String> = env::args().collect();
    let file_path = &args[1];
    let mut reader = File::open(file_path)?;

    // we can read its metadata:
    let metadata = read::read_metadata(&mut reader)?;

    // and infer a [`Schema`] from the `metadata`.
    let schema = read::infer_schema(&metadata)?;

    // we can filter the columns we need (here we select all)
    let schema = schema.filter(|_index, _field| true);

    // we can read the statistics of all parquet's row groups (here for each field)
    for field in &schema.fields {
        let statistics = read::statistics::deserialize(field, &metadata.row_groups)?;
        println!("{statistics:#?}");
    }

    // say we found that we only need to read the first two row groups, "0" and "1"
    let row_groups = metadata
        .row_groups
        .into_iter()
        .enumerate()
        .filter(|(index, _)| *index == 0 || *index == 1)
        .map(|(_, row_group)| row_group)
        .collect();

    // we can then read the row groups into chunks
    let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None, None);

    let start = SystemTime::now();
    for maybe_chunk in chunks {
        let chunk = maybe_chunk?;
        assert!(!chunk.is_empty());
    }
    println!("took: {} ms", start.elapsed().unwrap().as_millis());
    Ok(())
}

The example above minimizes memory usage at the expense of mixing IO and CPU tasks on the same thread, which may hurt performance if one of them is a bottleneck.

Parallelism decoupling of CPU from IO

One important aspect of the pages created by the iterator above is that they can cross thread boundaries. Consequently, the thread reading pages from a file (IO-bounded) does not have to be the same thread performing CPU-bounded work (decompressing, decoding, etc.).

The example below assumes that CPU starves the consumption of pages, and that it is advantageous to have a single thread performing all IO-intensive work, by delegating all CPU-intensive tasks to separate threads.

//! Example demonstrating how to read from parquet in parallel using rayon
use std::fs::File;
use std::io::BufReader;
use std::time::SystemTime;

use log::trace;
use rayon::prelude::*;

use arrow2::{
    array::Array,
    chunk::Chunk,
    error::Result,
    io::parquet::read::{self, ArrayIter},
};

mod logger;

/// Advances each iterator in parallel
/// # Panic
/// If the iterators are empty
fn deserialize_parallel(iters: &mut [ArrayIter<'static>]) -> Result<Chunk<Box<dyn Array>>> {
    // CPU-bounded
    let arrays = iters
        .par_iter_mut()
        .map(|iter| iter.next().transpose())
        .collect::<Result<Vec<_>>>()?;

    Chunk::try_new(arrays.into_iter().map(|x| x.unwrap()).collect())
}

fn parallel_read(path: &str, row_group: usize) -> Result<()> {
    // open the file
    let mut file = BufReader::new(File::open(path)?);

    // read Parquet's metadata and infer Arrow schema
    let metadata = read::read_metadata(&mut file)?;
    let schema = read::infer_schema(&metadata)?;

    // select the row group from the metadata
    let row_group = &metadata.row_groups[row_group];

    let chunk_size = 1024 * 8 * 8;

    // read (IO-bounded) all columns into memory (use a subset of the fields to project)
    let mut columns =
        read::read_columns_many(&mut file, row_group, schema.fields, Some(chunk_size))?;

    // deserialize (CPU-bounded) to Arrow in chunks
    let mut num_rows = row_group.num_rows();
    while num_rows > 0 {
        num_rows = num_rows.saturating_sub(chunk_size);
        trace!("[parquet/deserialize][start]");
        let chunk = deserialize_parallel(&mut columns)?;
        trace!("[parquet/deserialize][end][{}]", chunk.len());
        assert!(!chunk.is_empty());
    }
    Ok(())
}

fn main() -> Result<()> {
    log::set_logger(&logger::LOGGER)
        .map(|()| log::set_max_level(log::LevelFilter::Trace))
        .unwrap();

    use std::env;
    let args: Vec<String> = env::args().collect();
    let file_path = &args[1];
    let row_group = args[2].parse::<usize>().unwrap();

    let start = SystemTime::now();
    parallel_read(file_path, row_group)?;
    println!("took: {} ms", start.elapsed().unwrap().as_millis());

    Ok(())
}

This can of course be reversed; in configurations where IO is bounded (e.g. when a network is involved), we can use multiple producers of pages, potentially divided in file readers, and a single consumer that performs all CPU-intensive work.

Apache Arrow <-> Apache Parquet

Arrow and Parquet are two different formats that declare different physical and logical types. When reading Parquet, we must infer to which types we are reading the data to. This inference is based on Parquet's physical, logical and converted types.

When a logical type is defined, we use it as follows:

ParquetParquet logicalDataType
Int32Int8Int8
Int32Int16Int16
Int32Int32Int32
Int32UInt8UInt8
Int32UInt16UInt16
Int32UInt32UInt32
Int32DecimalDecimal
Int32DateDate32
Int32Time(ms)Time32(ms)
Int64Int64Int64
Int64UInt64UInt64
Int64Time(us)Time64(us)
Int64Time(ns)Time64(ns)
Int64Timestamp(_)Timestamp(_)
Int64DecimalDecimal
ByteArrayUtf8Utf8
ByteArrayJSONBinary
ByteArrayBSONBinary
ByteArrayENUMBinary
ByteArrayDecimalDecimal
FixedLenByteArrayDecimalDecimal

When a logical type is not defined but a converted type is defined, we use the equivalent conversion as above, mutatis mutandis.

When neither is defined, we fall back to the physical representation:

ParquetDataType
BooleanBoolean
Int32Int32
Int64Int64
Int96Timestamp(ns)
FloatFloat32
DoubleFloat64
ByteArrayBinary
FixedLenByteArrayFixedSizeBinary