Read parquet
When compiled with feature io_parquet
, this crate can be used to read parquet files
to arrow.
It makes minimal assumptions on how you to decompose CPU and IO intensive tasks.
First, some notation:
page
: part of a column (e.g. similar to a slice of anArray
)column chunk
: composed of multiple pages (similar to anArray
)row group
: a group of columns with the same length (similar to aChunk
)
Here is how to read a single column chunk from a single row group:
use std::fs::File; use std::time::SystemTime; use arrow2::error::Error; use arrow2::io::parquet::read; fn main() -> Result<(), Error> { // say we have a file use std::env; let args: Vec<String> = env::args().collect(); let file_path = &args[1]; let mut reader = File::open(file_path)?; // we can read its metadata: let metadata = read::read_metadata(&mut reader)?; // and infer a [`Schema`] from the `metadata`. let schema = read::infer_schema(&metadata)?; // we can filter the columns we need (here we select all) let schema = schema.filter(|_index, _field| true); // we can read the statistics of all parquet's row groups (here for each field) for field in &schema.fields { let statistics = read::statistics::deserialize(field, &metadata.row_groups)?; println!("{statistics:#?}"); } // say we found that we only need to read the first two row groups, "0" and "1" let row_groups = metadata .row_groups .into_iter() .enumerate() .filter(|(index, _)| *index == 0 || *index == 1) .map(|(_, row_group)| row_group) .collect(); // we can then read the row groups into chunks let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None, None); let start = SystemTime::now(); for maybe_chunk in chunks { let chunk = maybe_chunk?; assert!(!chunk.is_empty()); } println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) }
The example above minimizes memory usage at the expense of mixing IO and CPU tasks on the same thread, which may hurt performance if one of them is a bottleneck.
Parallelism decoupling of CPU from IO
One important aspect of the pages created by the iterator above is that they can cross thread boundaries. Consequently, the thread reading pages from a file (IO-bounded) does not have to be the same thread performing CPU-bounded work (decompressing, decoding, etc.).
The example below assumes that CPU starves the consumption of pages, and that it is advantageous to have a single thread performing all IO-intensive work, by delegating all CPU-intensive tasks to separate threads.
//! Example demonstrating how to read from parquet in parallel using rayon use std::fs::File; use std::io::BufReader; use std::time::SystemTime; use log::trace; use rayon::prelude::*; use arrow2::{ array::Array, chunk::Chunk, error::Result, io::parquet::read::{self, ArrayIter}, }; mod logger; /// Advances each iterator in parallel /// # Panic /// If the iterators are empty fn deserialize_parallel(iters: &mut [ArrayIter<'static>]) -> Result<Chunk<Box<dyn Array>>> { // CPU-bounded let arrays = iters .par_iter_mut() .map(|iter| iter.next().transpose()) .collect::<Result<Vec<_>>>()?; Chunk::try_new(arrays.into_iter().map(|x| x.unwrap()).collect()) } fn parallel_read(path: &str, row_group: usize) -> Result<()> { // open the file let mut file = BufReader::new(File::open(path)?); // read Parquet's metadata and infer Arrow schema let metadata = read::read_metadata(&mut file)?; let schema = read::infer_schema(&metadata)?; // select the row group from the metadata let row_group = &metadata.row_groups[row_group]; let chunk_size = 1024 * 8 * 8; // read (IO-bounded) all columns into memory (use a subset of the fields to project) let mut columns = read::read_columns_many(&mut file, row_group, schema.fields, Some(chunk_size))?; // deserialize (CPU-bounded) to Arrow in chunks let mut num_rows = row_group.num_rows(); while num_rows > 0 { num_rows = num_rows.saturating_sub(chunk_size); trace!("[parquet/deserialize][start]"); let chunk = deserialize_parallel(&mut columns)?; trace!("[parquet/deserialize][end][{}]", chunk.len()); assert!(!chunk.is_empty()); } Ok(()) } fn main() -> Result<()> { log::set_logger(&logger::LOGGER) .map(|()| log::set_max_level(log::LevelFilter::Trace)) .unwrap(); use std::env; let args: Vec<String> = env::args().collect(); let file_path = &args[1]; let row_group = args[2].parse::<usize>().unwrap(); let start = SystemTime::now(); parallel_read(file_path, row_group)?; println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) }
This can of course be reversed; in configurations where IO is bounded (e.g. when a network is involved), we can use multiple producers of pages, potentially divided in file readers, and a single consumer that performs all CPU-intensive work.
Apache Arrow <-> Apache Parquet
Arrow and Parquet are two different formats that declare different physical and logical types. When reading Parquet, we must infer to which types we are reading the data to. This inference is based on Parquet's physical, logical and converted types.
When a logical type is defined, we use it as follows:
Parquet | Parquet logical | DataType |
---|---|---|
Int32 | Int8 | Int8 |
Int32 | Int16 | Int16 |
Int32 | Int32 | Int32 |
Int32 | UInt8 | UInt8 |
Int32 | UInt16 | UInt16 |
Int32 | UInt32 | UInt32 |
Int32 | Decimal | Decimal |
Int32 | Date | Date32 |
Int32 | Time(ms) | Time32(ms) |
Int64 | Int64 | Int64 |
Int64 | UInt64 | UInt64 |
Int64 | Time(us) | Time64(us) |
Int64 | Time(ns) | Time64(ns) |
Int64 | Timestamp(_) | Timestamp(_) |
Int64 | Decimal | Decimal |
ByteArray | Utf8 | Utf8 |
ByteArray | JSON | Binary |
ByteArray | BSON | Binary |
ByteArray | ENUM | Binary |
ByteArray | Decimal | Decimal |
FixedLenByteArray | Decimal | Decimal |
When a logical type is not defined but a converted type is defined, we use the equivalent conversion as above, mutatis mutandis.
When neither is defined, we fall back to the physical representation:
Parquet | DataType |
---|---|
Boolean | Boolean |
Int32 | Int32 |
Int64 | Int64 |
Int96 | Timestamp(ns) |
Float | Float32 |
Double | Float64 |
ByteArray | Binary |
FixedLenByteArray | FixedSizeBinary |