У меня есть несколько большой (~ 20 ГБ) набор данных в паркетном формате. Я хотел бы прочитать определенные разделы из набора данных, используя pyarrow
. Я думал, что смогу добиться этого с pyarrow.parquet.ParquetDataset
, но, похоже, это не так. Вот небольшой пример, чтобы проиллюстрировать то, что я хочу.
Чтобы создать случайный набор данных:
from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob
import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset
def get_partitions(basepath, partitions):
"""Generate directory hierarchy for a paritioned dataset
data
├── part1=foo
│ └── part2=True
├── part1=foo
│ └── part2=False
├── part1=bar
│ └── part2=True
└── part1=bar
└── part2=False
"""
path_tmpl = '/'.join(['{}={}'] * len(partitions)) # part=value
path_tmpl = '{}/{}'.format(basepath, path_tmpl) # part1=val/part2=val
parts = [product([part], vals) for part, vals in partitions.items()]
parts = [i for i in product(*parts)]
return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]
partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
# 3 columns, 5 rows
data = [pa.array(np.random.rand(5)) for i in range(3)]
table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
os.makedirs(part, exist_ok=True)
out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
table.schema, flavor='spark')
out.write_table(table)
out.close()
Я хочу прочитать все значения для первого раздела и только True для раздела 2. С pandas.read_parquet
это невозможно, я должен всегда читать весь столбец. Я пробовал следующее с pyarrow
:
parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()
Это тоже не работает:
>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')
Я легко могу сделать это в pyspark
вот так:
def get_spark_session_ctx(appName):
"""Get or create a Spark Session, and the underlying Context."""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appName).getOrCreate()
sc = spark.sparkContext
return (spark, sc)
spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()
Как вы можете видеть ниже:
>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')
Можно ли это сделать с помощью pyarrow
или pandas
, или мне нужна специальная реализация?
Обновление: По запросу Уэса, теперь это находится в JIRA а>.