CSV reader

When compiled with feature io_csv, you can use this crate to read CSV files. This crate makes minimal assumptions on how you want to read a CSV, and offers a large degree of customization to it, along with a useful default.

Background

There are two CPU-intensive tasks in reading a CSV file:

  • split the CSV file into rows, which includes parsing quotes and delimiters, and is necessary to seek to a given row.
  • parse a set of CSV rows (bytes) into a Arrays.

Parsing bytes into values is more expensive than interpreting lines. As such, it is generally advantageous to have multiple readers of a single file that scan different parts of the file (within IO constraints).

This crate relies on the crate csv to scan and seek CSV files, and your code also needs such a dependency. With that said, arrow2 makes no assumptions as to how to efficiently read the CSV: as a single reader per file or multiple readers.

As an example, the following infers the schema and reads a CSV by re-using the same reader:

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::csv::read;

fn read_path(path: &str, projection: Option<&[usize]>) -> Result<Chunk<Box<dyn Array>>> {
    // Create a CSV reader. This is typically created on the thread that reads the file and
    // thus owns the read head.
    let mut reader = read::ReaderBuilder::new().from_path(path)?;

    // Infers the fields using the default inferer. The inferer is just a function that maps bytes
    // to a `DataType`.
    let (fields, _) = read::infer_schema(&mut reader, None, true, &read::infer)?;

    // allocate space to read from CSV to. The size of this vec denotes how many rows are read.
    let mut rows = vec![read::ByteRecord::default(); 100];

    // skip 0 (excluding the header) and read up to 100 rows.
    // this is IO-intensive and performs minimal CPU work. In particular,
    // no deserialization is performed.
    let rows_read = read::read_rows(&mut reader, 0, &mut rows)?;
    let rows = &rows[..rows_read];

    // parse the rows into a `Chunk`. This is CPU-intensive, has no IO,
    // and can be performed on a different thread by passing `rows` through a channel.
    // `deserialize_column` is a function that maps rows and a column index to an Array
    read::deserialize_batch(rows, &fields, projection, 0, read::deserialize_column)
}

fn main() -> Result<()> {
    use std::env;
    let args: Vec<String> = env::args().collect();

    let file_path = &args[1];

    let batch = read_path(file_path, None)?;
    println!("{:?}", batch);
    Ok(())
}

Orchestration and parallelization

Because csv's API is synchronous, the functions above represent the "minimal unit of synchronous work", IO and CPU. Note that rows above are Send, which implies that it is possible to run parse on a separate thread, thereby maximizing IO throughput. The example below shows how to do just that:

use crossbeam_channel::unbounded;

use std::thread;
use std::time::SystemTime;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::{error::Result, io::csv::read};

fn parallel_read(path: &str) -> Result<Vec<Chunk<Box<dyn Array>>>> {
    let batch_size = 100;
    let has_header = true;
    let projection = None;

    // prepare a channel to send serialized records from threads
    let (tx, rx) = unbounded();

    let mut reader = read::ReaderBuilder::new().from_path(path)?;
    let (fields, _) =
        read::infer_schema(&mut reader, Some(batch_size * 10), has_header, &read::infer)?;
    let fields = Box::new(fields);

    let start = SystemTime::now();
    // spawn a thread to produce `Vec<ByteRecords>` (IO bounded)
    let child = thread::spawn(move || {
        let mut line_number = 0;
        let mut size = 1;
        while size > 0 {
            let mut rows = vec![read::ByteRecord::default(); batch_size];
            let rows_read = read::read_rows(&mut reader, 0, &mut rows).unwrap();
            rows.truncate(rows_read);
            line_number += rows.len();
            size = rows.len();
            tx.send((rows, line_number)).unwrap();
        }
    });

    let mut children = Vec::new();
    // use 3 consumers of to decompress, decode and deserialize.
    for _ in 0..3 {
        let rx_consumer = rx.clone();
        let consumer_fields = fields.clone();
        let child = thread::spawn(move || {
            let (rows, line_number) = rx_consumer.recv().unwrap();
            let start = SystemTime::now();
            println!("consumer start - {}", line_number);
            let batch = read::deserialize_batch(
                &rows,
                &consumer_fields,
                projection,
                0,
                read::deserialize_column,
            )
            .unwrap();
            println!(
                "consumer end - {:?}: {}",
                start.elapsed().unwrap(),
                line_number,
            );
            batch
        });
        children.push(child);
    }

    child.join().expect("child thread panicked");

    let batches = children
        .into_iter()
        .map(|x| x.join().unwrap())
        .collect::<Vec<_>>();
    println!("Finished - {:?}", start.elapsed().unwrap());

    Ok(batches)
}

fn main() -> Result<()> {
    use std::env;
    let args: Vec<String> = env::args().collect();
    let file_path = &args[1];

    let batches = parallel_read(file_path)?;
    for batch in batches {
        println!("{}", batch.len())
    }
    Ok(())
}

Async

This crate also supports reading from a CSV asynchronously through the csv-async crate. The example below demonstrates this:

use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::error::Result;
use arrow2::io::csv::read_async::*;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
    use std::env;
    let args: Vec<String> = env::args().collect();

    let file_path = &args[1];

    let file = File::open(file_path).await?.compat();

    let mut reader = AsyncReaderBuilder::new().create_reader(file);

    let (fields, _) = infer_schema(&mut reader, None, true, &infer).await?;

    let mut rows = vec![ByteRecord::default(); 100];
    let rows_read = read_rows(&mut reader, 0, &mut rows).await?;

    let columns = deserialize_batch(&rows[..rows_read], &fields, None, 0, deserialize_column)?;
    println!("{:?}", columns.arrays()[0]);
    Ok(())
}

Note that the deserialization should be performed on a separate thread to not block (see also here), which this example does not show.

Customization

In the code above, parser and infer allow for customization: they declare how rows of bytes should be inferred (into a logical type), and processed (into a value of said type). They offer good default options, but you can customize the inference and parsing to your own needs. You can also of course decide to parse everything into memory as Utf8Array and delay any data transformation.