流式传输、序列化和 IPC#
写入和读取流#
Arrow 定义了两种二进制格式来序列化记录批次:
流式格式:用于发送任意长度的记录批次序列。该格式必须从头到尾处理,不支持随机访问
文件或随机访问格式:用于序列化固定数量的记录批次。支持随机访问,因此与内存映射一起使用时非常有用
要遵循本节内容,请确保首先阅读有关内存和 IO 的部分。
使用流#
首先,让我们创建一个小型记录批次:
import pyarrow as pa
data = [
pa.array([1, 2, 3, 4]),
pa.array(['foo', 'bar', 'baz', None]),
pa.array([True, None, False, True])
]
batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
batch.num_rows, batch.num_columns
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
Cell In[1], line 1
----> 1 import pyarrow as pa
3 data = [
4 pa.array([1, 2, 3, 4]),
5 pa.array(['foo', 'bar', 'baz', None]),
6 pa.array([True, None, False, True])
7 ]
10 batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
ModuleNotFoundError: No module named 'pyarrow'
现在,我们可以开始写入包含这些批次中一些数量的流。为此,我们使用 pyarrow.RecordBatchStreamWriter
,它可以写入可写的 pyarrow.NativeFile
对象或可写的 Python 对象。为了方便起见,可以使用 pyarrow.ipc.new_stream()
创建这个对象:
sink = pa.BufferOutputStream()
with pa.ipc.new_stream(sink, batch.schema) as writer:
for i in range(5):
writer.write_batch(batch)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[2], line 1
----> 1 sink = pa.BufferOutputStream()
3 with pa.ipc.new_stream(sink, batch.schema) as writer:
4 for i in range(5):
NameError: name 'pa' is not defined
在这里,我们使用了内存中的 Arrow 缓冲流(sink
),但这也可以是套接字或其他 IO 接收器。
创建 StreamWriter
时,我们传递了 schema
,因为 schema
(列名和类型)必须与此特定流中发送的所有批次相同。现在我们可以执行:
buf = sink.getvalue()
buf.size
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[3], line 1
----> 1 buf = sink.getvalue()
3 buf.size
NameError: name 'sink' is not defined
现在 buf
包含了作为内存字节缓冲区的完整流。我们可以使用 RecordBatchStreamReader
或方便的函数 pyarrow.ipc.open_stream()
来读取这样的流:
with pa.ipc.open_stream(buf) as reader:
schema = reader.schema
batches = [b for b in reader]
schema
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[4], line 1
----> 1 with pa.ipc.open_stream(buf) as reader:
2 schema = reader.schema
3 batches = [b for b in reader]
NameError: name 'pa' is not defined
len(batches)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[5], line 1
----> 1 len(batches)
NameError: name 'batches' is not defined
我们可以检查返回的批次是否与原始输入相同:
batches[0].equals(batch)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[6], line 1
----> 1 batches[0].equals(batch)
NameError: name 'batches' is not defined
一个重要的点是,如果输入源支持零拷贝读取(例如内存映射或 pyarrow.BufferReader
),则返回的批次也是零拷贝的,并且在读取时不会分配任何新内存。
写入和读取随机访问文件#
RecordBatchFileWriter
具有与 RecordBatchStreamWriter
相同的API。您可以使用 new_file()
创建:
sink = pa.BufferOutputStream()
with pa.ipc.new_file(sink, batch.schema) as writer:
for i in range(10):
writer.write_batch(batch)
buf = sink.getvalue()
buf.size
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[7], line 1
----> 1 sink = pa.BufferOutputStream()
3 with pa.ipc.new_file(sink, batch.schema) as writer:
4 for i in range(10):
NameError: name 'pa' is not defined
RecordBatchFileReader
和 RecordBatchStreamReader
之间的区别在于,输入源必须具有用于随机访问的 seek
方法。流读取器只需要读取操作。我们还可以使用 pyarrow.ipc.open_file()
方法打开文件:
with pa.ipc.open_file(buf) as reader:
num_record_batches = reader.num_record_batches
b = reader.get_batch(3)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[8], line 1
----> 1 with pa.ipc.open_file(buf) as reader:
2 num_record_batches = reader.num_record_batches
5 b = reader.get_batch(3)
NameError: name 'pa' is not defined
因为我们能够访问整个有效负载,我们知道文件中的记录批次数量,并且可以随机读取任何一个。
num_record_batches, b.equals(batch)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[9], line 1
----> 1 num_record_batches, b.equals(batch)
NameError: name 'num_record_batches' is not defined
从流和文件格式读取到 pandas
#
流和文件读取器类具有特殊的 read_pandas
方法,用于简化读取多个记录批次并将它们转换为单个 DataFrame
输出:
with pa.ipc.open_file(buf) as reader:
df = reader.read_pandas()
df[:5]
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[10], line 1
----> 1 with pa.ipc.open_file(buf) as reader:
2 df = reader.read_pandas()
5 df[:5]
NameError: name 'pa' is not defined
高效地写入和读取 Arrow 数据#
作为针对零拷贝和内存映射数据优化的,Arrow 允许轻松读取和写入数组,同时消耗最少的驻留内存。
在写入和读取原始 Arrow 数据时,我们可以使用 Arrow 文件格式或 Arrow 流式格式。
要将数组转储到文件,可以使用 new_file()
,它将提供新的 RecordBatchFileWriter
实例,可用于将数据批次写入该文件。
例如,要写包含 1000 万个整数的数组,我们可以将其分为 1000 个块,每个块包含 10000 个条目:
BATCH_SIZE = 10000
NUM_BATCHES = 1000
schema = pa.schema([pa.field('nums', pa.int32())])
with pa.OSFile('bigfile.arrow', 'wb') as sink:
with pa.ipc.new_file(sink, schema) as writer:
for row in range(NUM_BATCHES):
batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
writer.write(batch)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[11], line 5
1 BATCH_SIZE = 10000
3 NUM_BATCHES = 1000
----> 5 schema = pa.schema([pa.field('nums', pa.int32())])
7 with pa.OSFile('bigfile.arrow', 'wb') as sink:
8 with pa.ipc.new_file(sink, schema) as writer:
NameError: name 'pa' is not defined
记录批次支持多列,因此实际上我们总是写入相当于一个表的数据。
分批写入是有效的,因为理论上我们只需要在内存中保持当前正在写入的批次。但在读取回来时,我们可以更有效,通过直接从磁盘映射数据,避免在读取时分配任何新内存。
在正常情况下,读取我们的文件将消耗几百兆字节的内存:
with pa.OSFile('bigfile.arrow', 'rb') as source:
loaded_array = pa.ipc.open_file(source).read_all()
print("LEN:", len(loaded_array))
print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[12], line 1
----> 1 with pa.OSFile('bigfile.arrow', 'rb') as source:
2 loaded_array = pa.ipc.open_file(source).read_all()
5 print("LEN:", len(loaded_array))
NameError: name 'pa' is not defined
为了更有效地从磁盘读取大数据,我们可以对文件进行内存映射,这样Arrow可以直接引用从磁盘映射的数据,避免需要分配自己的内存。在这种情况下,操作系统将能够懒加载映射的内存,并在压力下将其换出而无需任何写回成本,从而更容易读取比总内存还要大的数组。
with pa.memory_map('bigfile.arrow', 'rb') as source:
loaded_array = pa.ipc.open_file(source).read_all()
print("LEN:", len(loaded_array))
print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[13], line 1
----> 1 with pa.memory_map('bigfile.arrow', 'rb') as source:
2 loaded_array = pa.ipc.open_file(source).read_all()
4 print("LEN:", len(loaded_array))
NameError: name 'pa' is not defined
备注
其他高级 API ,如 read_table()
,也提供了 memory_map
选项。但在这些情况下,内存映射无法帮助减少驻留内存消耗。详情请参阅读取 Parquet 和内存映射。