Read Arrow streams

When compiled with feature io_ipc, this crate can be used to read Arrow streams.

The example below shows how to read from a stream:

use std::net::TcpStream;
use std::thread;
use std::time::Duration;

use arrow2::array::{Array, Int64Array};
use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::ipc::read;

fn main() -> Result<()> {
    const ADDRESS: &str = "127.0.0.1:12989";

    let mut reader = TcpStream::connect(ADDRESS)?;
    let metadata = read::read_stream_metadata(&mut reader)?;
    let mut stream = read::StreamReader::new(&mut reader, metadata);

    let mut idx = 0;
    loop {
        match stream.next() {
            Some(x) => match x {
                Ok(read::StreamState::Some(b)) => {
                    idx += 1;
                    println!("batch: {:?}", idx)
                }
                Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(2000)),
                Err(l) => println!("{:?} ({})", l, idx),
            },
            None => break,
        };
    }

    Ok(())
}

e.g. written by pyarrow:

import pyarrow as pa
from time import sleep
import socket

# Set up the data exchange socket
sk = socket.socket()
sk.bind(("127.0.0.1", 12989))
sk.listen()

data = [
    pa.array([1, 2, 3, 4]),
    pa.array(["foo", "bar", "baz", None]),
    pa.array([True, None, False, True]),
]

batch = pa.record_batch(data, names=["f0", "f1", "f2"])

# Accept incoming connection and stream the data away
connection, address = sk.accept()
dummy_socket_file = connection.makefile("wb")
with pa.RecordBatchStreamWriter(dummy_socket_file, batch.schema) as writer:
    for i in range(50):
        writer.write_batch(batch)
        sleep(1)

via

python main.py &
PRODUCER_PID=$!

sleep 1 # wait for metadata to be available.
cargo run

kill $PRODUCER_PID