Read parquet
When compiled with feature io_parquet
, this crate can be used to read parquet files
to arrow.
It makes minimal assumptions on how you to decompose CPU and IO intensive tasks.
First, some notation:
page
: part of a column (e.g. similar to a slice of anArray
)column chunk
: composed of multiple pages (similar to anArray
)row group
: a group of columns with the same length (similar to aChunk
)
Here is how to read a single column chunk from a single row group:
use std::fs::File; use std::time::SystemTime; use arrow2::error::Error; use arrow2::io::parquet::read; fn main() -> Result<(), Error> { // say we have a file use std::env; let args: Vec<String> = env::args().collect(); let file_path = &args[1]; let mut reader = File::open(file_path)?; // we can read its metadata: let metadata = read::read_metadata(&mut reader)?; // and infer a [`Schema`] from the `metadata`. let schema = read::infer_schema(&metadata)?; // we can filter the columns we need (here we select all) let schema = schema.filter(|_index, _field| true); // we can read the statistics of all parquet's row groups (here for the first field) let statistics = read::statistics::deserialize(&schema.fields[0], &metadata.row_groups)?; println!("{:#?}", statistics); // say we found that we only need to read the first two row groups, "0" and "1" let row_groups = metadata .row_groups .into_iter() .enumerate() .filter(|(index, _)| *index == 0 || *index == 1) .map(|(_, row_group)| row_group) .collect(); // we can then read the row groups into chunks let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None, None); let start = SystemTime::now(); for maybe_chunk in chunks { let chunk = maybe_chunk?; assert!(!chunk.is_empty()); } println!("took: {} ms", start.elapsed().unwrap().as_millis()); Ok(()) }
The example above minimizes memory usage at the expense of mixing IO and CPU tasks on the same thread, which may hurt performance if one of them is a bottleneck.
For single-threaded reading, buffers used to read and decompress pages can be re-used. This create offers an API that encapsulates the above logic:
#![allow(unused)] fn main() { {{#include ../../../examples/parquet_read_record.rs}} }
Parallelism decoupling of CPU from IO
One important aspect of the pages created by the iterator above is that they can cross thread boundaries. Consequently, the thread reading pages from a file (IO-bounded) does not have to be the same thread performing CPU-bounded work (decompressing, decoding, etc.).
The example below assumes that CPU starves the consumption of pages, and that it is advantageous to have a single thread performing all IO-intensive work, by delegating all CPU-intensive tasks to separate threads.
#![allow(unused)] fn main() { {{#include ../../../examples/parquet_read_parallel.rs}} }
This can of course be reversed; in configurations where IO is bounded (e.g. when a network is involved), we can use multiple producers of pages, potentially divided in file readers, and a single consumer that performs all CPU-intensive work.
Apache Arrow <-> Apache Parquet
Arrow and Parquet are two different formats that declare different physical and logical types. When reading Parquet, we must infer to which types we are reading the data to. This inference is based on Parquet's physical, logical and converted types.
When a logical type is defined, we use it as follows:
Parquet | Parquet logical | DataType |
---|---|---|
Int32 | Int8 | Int8 |
Int32 | Int16 | Int16 |
Int32 | Int32 | Int32 |
Int32 | UInt8 | UInt8 |
Int32 | UInt16 | UInt16 |
Int32 | UInt32 | UInt32 |
Int32 | Decimal | Decimal |
Int32 | Date | Date32 |
Int32 | Time(ms) | Time32(ms) |
Int64 | Int64 | Int64 |
Int64 | UInt64 | UInt64 |
Int64 | Time(us) | Time64(us) |
Int64 | Time(ns) | Time64(ns) |
Int64 | Timestamp(_) | Timestamp(_) |
Int64 | Decimal | Decimal |
ByteArray | Utf8 | Utf8 |
ByteArray | JSON | Binary |
ByteArray | BSON | Binary |
ByteArray | ENUM | Binary |
ByteArray | Decimal | Decimal |
FixedLenByteArray | Decimal | Decimal |
When a logical type is not defined but a converted type is defined, we use the equivalent conversion as above, mutatis mutandis.
When neither is defined, we fall back to the physical representation:
Parquet | DataType |
---|---|
Boolean | Boolean |
Int32 | Int32 |
Int64 | Int64 |
Int96 | Timestamp(ns) |
Float | Float32 |
Double | Float64 |
ByteArray | Binary |
FixedLenByteArray | FixedSizeBinary |