Write CSV
When compiled with feature io_csv
, you can use this crate to write CSV files.
This crate relies on the crate csv to write well-formed CSV files, which your code should also depend on.
The following example writes a batch as a CSV file with the default configuration:
use arrow2::{ array::{Array, Int32Array}, chunk::Chunk, error::Result, io::csv::write, }; fn write_batch<A: AsRef<dyn Array>>(path: &str, columns: &[Chunk<A>]) -> Result<()> { let mut writer = std::fs::File::create(path)?; let options = write::SerializeOptions::default(); write::write_header(&mut writer, &["c1"], &options)?; columns .iter() .try_for_each(|batch| write::write_chunk(&mut writer, batch, &options)) } fn main() -> Result<()> { let array = Int32Array::from(&[ Some(0), Some(1), Some(2), Some(3), Some(4), Some(5), Some(6), ]); let batch = Chunk::try_new(vec![&array as &dyn Array])?; write_batch("example.csv", &[batch]) }
Parallelism
This crate exposes functionality to decouple serialization from writing.
In the example above, the serialization and writing to a file is done synchronously. However, these typically deal with different bounds: serialization is often CPU bounded, while writing is often IO bounded. We can trade-off these through a higher memory usage.
Suppose that we know that we are getting CPU-bounded at serialization, and would like to offload that workload to other threads, at the cost of a higher memory usage. We would achieve this as follows (two batches for simplicity):
use std::io::Write; use std::sync::mpsc; use std::sync::mpsc::{Receiver, Sender}; use std::thread; use arrow2::{ array::{Array, Int32Array}, chunk::Chunk, error::Result, io::csv::write, }; fn parallel_write(path: &str, batches: [Chunk<Box<dyn Array>>; 2]) -> Result<()> { let options = write::SerializeOptions::default(); // write a header let mut writer = std::fs::File::create(path)?; write::write_header(&mut writer, &["c1"], &options)?; // prepare a channel to send serialized records from threads let (tx, rx): (Sender<_>, Receiver<_>) = mpsc::channel(); let mut children = Vec::new(); (0..2).for_each(|id| { // The sender endpoint can be cloned let thread_tx = tx.clone(); let options = options.clone(); let batch = batches[id].clone(); // note: this is cheap let child = thread::spawn(move || { let rows = write::serialize(&batch, &options).unwrap(); thread_tx.send(rows).unwrap(); }); children.push(child); }); for _ in 0..2 { // block: assumes that the order of batches matter. let records = rx.recv().unwrap(); records.iter().try_for_each(|row| writer.write_all(row))? } for child in children { child.join().expect("child thread panicked"); } Ok(()) } fn main() -> Result<()> { let array = Int32Array::from(&[ Some(0), Some(1), Some(2), Some(3), Some(4), Some(5), Some(6), ]); let columns = Chunk::new(vec![array.boxed()]); parallel_write("example.csv", [columns.clone(), columns]) }