I came across a situation where we need to use a plain gRPC client (through the grpc.aio API) to talk to an Arrow Flight gRPC server.
The DoGet call did make it to the server, and we have received a FlightData in response. If our understanding of the Flight gRPC definition is correct, the response contains a flatbuffers message that can somehow be decoded into a RecordBatch.
Following, is the client-side code,
import asyncio
import pathlib
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import flight_pb2, flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
sock_file = pathlib.Path.cwd().joinpath("arena.sock").resolve()
async with grpc.aio.insecure_channel(f"unix://{sock_file}") as channel:
stub = flight_pb2_grpc.FlightServiceStub(channel)
async for data in stub.DoGet(flight_pb2.Ticket(ticket=ticket.ticket)):
assert type(data) is flight_pb2.FlightData
print(data)
# How to convert data into a RecordBatch?
asyncio.run(main())
Currently we stuck on this last step of decoding the FlightData response.
The question is two fold,
- are there some existing facilities form
pyarrow.flightthat we can use to decode a pythongrpcobject of theFlightDatatype; - if #1 is not possible, what are some other options to decode the content of the
FlightDataand reconstruct aRecordBatchfrom scratch?
The main interest here is to use the AsyncIO of plain gRPC client. Supposedly, this is not feasible with the current version of Arrow Flight gRPC client.
CodePudding user response:
There is indeed no utility exposed in pyarrow.flight for this.
ArrowData contains, among other things, the Arrow IPC header and body. So you can instead decode it using pyarrow.ipc. Here's an example:
import asyncio
import pathlib
import struct
import grpc
import pyarrow as pa
import pyarrow.flight as pf
import Flight_pb2, Flight_pb2_grpc
async def main():
ticket = pf.Ticket("tick")
async with grpc.aio.insecure_channel("localhost:1234") as channel:
stub = Flight_pb2_grpc.FlightServiceStub(channel)
schema = None
async for data in stub.DoGet(Flight_pb2.Ticket(ticket=ticket.ticket)):
# 4 bytes: Need IPC continuation token
token = b'\xff\xff\xff\xff'
# 4 bytes: message length (little-endian)
length = struct.pack('<I', len(data.data_header))
buf = pa.py_buffer(token length data.data_header data.data_body)
message = pa.ipc.read_message(buf)
print(message)
if schema is None:
# This should work but is unimplemented
# print(pa.ipc.read_schema(message))
schema = pa.ipc.read_schema(buf)
print(schema)
else:
batch = pa.ipc.read_record_batch(message, schema)
print(batch)
print(batch.to_pydict())
asyncio.run(main())
Server:
import pyarrow.flight as flight
import pyarrow as pa
class TestServer(flight.FlightServerBase):
def do_get(self, context, ticket):
table = pa.table([[1,2,3,4]], names=["a"])
return flight.RecordBatchStream(table)
TestServer("grpc://localhost:1234").serve()
There's some discussion about async Flight APIs, please join the dev@ mailing list if you would like to chime in.
