Read Arrow streams
When compiled with feature io_ipc
, this crate can be used to read Arrow streams.
The example below shows how to read from a stream:
use std::net::TcpStream; use std::thread; use std::time::Duration; use arrow2::array::{Array, Int64Array}; use arrow2::datatypes::DataType; use arrow2::error::Result; use arrow2::io::ipc::read; fn main() -> Result<()> { const ADDRESS: &str = "127.0.0.1:12989"; let mut reader = TcpStream::connect(ADDRESS)?; let metadata = read::read_stream_metadata(&mut reader)?; let mut stream = read::StreamReader::new(&mut reader, metadata); let mut idx = 0; loop { match stream.next() { Some(x) => match x { Ok(read::StreamState::Some(b)) => { idx += 1; println!("batch: {:?}", idx) } Ok(read::StreamState::Waiting) => thread::sleep(Duration::from_millis(2000)), Err(l) => println!("{:?} ({})", l, idx), }, None => break, }; } Ok(()) }
e.g. written by pyarrow:
import pyarrow as pa
from time import sleep
import socket
# Set up the data exchange socket
sk = socket.socket()
sk.bind(("127.0.0.1", 12989))
sk.listen()
data = [
pa.array([1, 2, 3, 4]),
pa.array(["foo", "bar", "baz", None]),
pa.array([True, None, False, True]),
]
batch = pa.record_batch(data, names=["f0", "f1", "f2"])
# Accept incoming connection and stream the data away
connection, address = sk.accept()
dummy_socket_file = connection.makefile("wb")
with pa.RecordBatchStreamWriter(dummy_socket_file, batch.schema) as writer:
for i in range(50):
writer.write_batch(batch)
sleep(1)
via
python main.py &
PRODUCER_PID=$!
sleep 1 # wait for metadata to be available.
cargo run
kill $PRODUCER_PID