Write CSV

When compiled with feature io_csv, you can use this crate to write CSV files.

This crate relies on the crate csv to write well-formed CSV files, which your code should also depend on.

The following example writes a batch as a CSV file with the default configuration:

use arrow2::{
    array::{Array, Int32Array},
    chunk::Chunk,
    error::Result,
    io::csv::write,
};

fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Chunk<A>]) -> Result<()> {
    let mut writer = std::fs::File::create(path)?;

    let options = write::SerializeOptions::default();
    write::write_header(&mut writer, &["c1"], &options)?;

    columns
        .iter()
        .try_for_each(|batch| write::write_chunk(&mut writer, batch, &options))
}

fn main() -> Result<()> {
    let array = Int32Array::from(&[
        Some(0),
        Some(1),
        Some(2),
        Some(3),
        Some(4),
        Some(5),
        Some(6),
    ]);
    let batch = Chunk::try_new(vec![&array as &dyn Array])?;

    write_batch("example.csv", &[batch])
}

Parallelism

This crate exposes functionality to decouple serialization from writing.

In the example above, the serialization and writing to a file is done synchronously. However, these typically deal with different bounds: serialization is often CPU bounded, while writing is often IO bounded. We can trade-off these through a higher memory usage.

Suppose that we know that we are getting CPU-bounded at serialization, and would like to offload that workload to other threads, at the cost of a higher memory usage. We would achieve this as follows (two batches for simplicity):

use std::io::Write;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;

use arrow2::{
    array::{Array, Int32Array},
    chunk::Chunk,
    error::Result,
    io::csv::write,
};

fn parallel_write(path: &str, batches: [Chunk<Box<dyn Array>>; 2]) -> Result<()> {
    let options = write::SerializeOptions::default();

    // write a header
    let mut writer = std::fs::File::create(path)?;
    write::write_header(&mut writer, &["c1"], &options)?;

    // prepare a channel to send serialized records from threads
    let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel();
    let mut children = Vec::new();

    (0..2).for_each(|id| {
        // The sender endpoint can be cloned
        let thread_tx = tx.clone();

        let options = options.clone();
        let batch = batches[id].clone(); // note: this is cheap
        let child = thread::spawn(move || {
            let rows = write::serialize(&batch, &options).unwrap();
            thread_tx.send(rows).unwrap();
        });

        children.push(child);
    });

    for _ in 0..2 {
        // block: assumes that the order of batches matter.
        let records = rx.recv().unwrap();
        records.iter().try_for_each(|row| writer.write_all(row))?
    }

    for child in children {
        child.join().expect("child thread panicked");
    }

    Ok(())
}

fn main() -> Result<()> {
    let array = Int32Array::from(&[
        Some(0),
        Some(1),
        Some(2),
        Some(3),
        Some(4),
        Some(5),
        Some(6),
    ]);
    let columns = Chunk::new(vec![array.boxed()]);

    parallel_write("example.csv", [columns.clone(), columns])
}