Чтение определенных секций из секционированного набора данных паркета с помощью pyarrow

У меня есть несколько большой (~ 20 ГБ) набор данных в паркетном формате. Я хотел бы прочитать определенные разделы из набора данных, используя pyarrow. Я думал, что смогу добиться этого с pyarrow.parquet.ParquetDataset, но, похоже, это не так. Вот небольшой пример, чтобы проиллюстрировать то, что я хочу.

Чтобы создать случайный набор данных:

from collections import OrderedDict
from itertools import product, chain
from uuid import uuid4
import os
from glob import glob

import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow.parquet import ParquetWriter, ParquetDataset


def get_partitions(basepath, partitions):
    """Generate directory hierarchy for a paritioned dataset

    data
    ├── part1=foo
    │   └── part2=True
    ├── part1=foo
    │   └── part2=False
    ├── part1=bar
    │   └── part2=True
    └── part1=bar
        └── part2=False

    """
    path_tmpl = '/'.join(['{}={}'] * len(partitions))  # part=value
    path_tmpl = '{}/{}'.format(basepath, path_tmpl)    # part1=val/part2=val

    parts = [product([part], vals) for part, vals in partitions.items()]
    parts = [i for i in product(*parts)]
    return [path_tmpl.format(*tuple(chain.from_iterable(i))) for i in parts]


partitions = OrderedDict(part1=['foo', 'bar'], part2=[True, False])
parts = get_partitions('data', partitions)
for part in parts:
    # 3 columns, 5 rows
    data = [pa.array(np.random.rand(5)) for i in range(3)]
    table = pa.Table.from_arrays(data, ['a', 'b', 'c'])
    os.makedirs(part, exist_ok=True)
    out = ParquetWriter('{}/{}.parquet'.format(part, uuid4()),
                        table.schema, flavor='spark')
    out.write_table(table)
    out.close()

Я хочу прочитать все значения для первого раздела и только True для раздела 2. С pandas.read_parquet это невозможно, я должен всегда читать весь столбец. Я пробовал следующее с pyarrow:

parts2 = OrderedDict(part1=['foo', 'bar'], part2=[True])
parts2 = get_partitions('data', parts2)
files = [glob('{}/*'.format(dirpath)) for dirpath in parts2]
files = [i for i in chain.from_iterable(files)]
df2 = ParquetDataset(files).read().to_pandas()

Это тоже не работает:

>>> df2.columns
Index(['a', 'b', 'c'], dtype='object')

Я легко могу сделать это в pyspark вот так:

def get_spark_session_ctx(appName):
    """Get or create a Spark Session, and the underlying Context."""
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName(appName).getOrCreate()
    sc = spark.sparkContext
    return (spark, sc)


spark, sc = get_spark_session_ctx('test')
spark_df = spark.read.option('basePath', 'data').parquet(*parts2)
df3 = spark_df.toPandas()

Как вы можете видеть ниже:

>>> df3.columns
Index(['a', 'b', 'c', 'part1', 'part2'], dtype='object')

Можно ли это сделать с помощью pyarrow или pandas, или мне нужна специальная реализация?

Обновление: По запросу Уэса, теперь это находится в JIRA .


person suvayu    schedule 28.12.2017    source источник


Ответы (2)


Вопрос: Как с помощью pyarrow прочитать определенные разделы из многораздельного набора данных паркета?

Ответ: Прямо сейчас нельзя.

Можете ли вы создать JIRA Apache Arrow, запрашивающую эту функцию, на https://issues.apache.org/jira?

Это то, что мы должны иметь возможность поддерживать в API pyarrow, но для его реализации потребуется кто-то. Спасибо

person Wes McKinney    schedule 28.12.2017
comment
Я сделаю это, спасибо. Думаю, мне следует создать запрос функции и для красного, и для записи (если я его не пропустил). Сейчас у меня много свободного времени, и если кто-то меня направит, я мог бы поработать и над реализацией. - person suvayu; 29.12.2017
comment
@wes Как происходит у pyarrow чтение разделенной паркетной таблицы в HDFS? Кажется, что API-интерфейс pyarrow HDFS по-прежнему не может предоставить объект набора данных для метода pq.ParquetDataset(). - person robert; 14.08.2018

Начиная с версии pyarrow 0.10.0, вы можете использовать filters kwarg для выполнения запроса. В вашем случае это будет выглядеть примерно так:

import pyarrow.parquet as pq
dataset = pq.ParquetDataset('path-to-your-dataset', filters=[('part2', '=', 'True'),])
table = dataset.read()

Ссылка

person ji.xu    schedule 01.10.2018
comment
Я знаю, но, к сожалению, это частичная поддержка (только для чтения). Поддержка записи отложена до 0.12.0. Я считаю, что это описано в JIRA. - person suvayu; 03.10.2018
comment
Большое спасибо за эту документацию. Я пытался заставить ParquetDatasetPiece работать, но это было единственное решение, с которым я мог работать. - person Geochem B; 09.01.2019