Как мне получить фрейм данных или запись в базу данных из TFX BulkInferrer?

Я новичок в TFX, но у меня есть явно работающий конвейер ML, который должен использоваться через BulkInferrer. Кажется, что это производит вывод исключительно в формате Protobuf, но, поскольку я выполняю массовый вывод, я хочу вместо этого передать результаты в базу данных. (Вывод DB кажется, что он должен быть по умолчанию для массового вывода, поскольку как массовый вывод, так и доступ к базе данных используют преимущества распараллеливания ... но Protobuf - это сериализованный формат для каждой записи.)

Я предполагаю, что мог бы использовать что-то вроде Parquet-Avro-Protobuf, чтобы выполнить преобразование (хотя это на Java, а остальная часть конвейера на Python), или я мог бы написать что-нибудь сам, чтобы обрабатывать все сообщения protobuf одно за другим, преобразовать их в JSON, десериализуйте JSON в список dicts и загрузите dict в Pandas DataFrame или сохраните его как набор пар ключ-значение, которые я рассматриваю как одноразовую БД ... но это звучит как много работы и боли, связанные с распараллеливанием и оптимизацией для очень распространенного случая использования. Определение сообщения Protobuf верхнего уровня - это PredictionLog от Tensorflow. .

Это должно быть обычным вариантом использования, потому что TensorFlowModelAnalytics функционирует как этот использует фреймы данных Pandas. Я бы предпочел иметь возможность писать напрямую в БД (желательно в Google BigQuery) или в файл Parquet (поскольку Parquet / Spark, кажется, распараллеливает лучше, чем Pandas), и, опять же, они кажутся обычными вариантами использования, но я примеров не нашел. Может, я неправильно использую поисковые запросы?

Я также посмотрел на PredictExtractor , поскольку извлечение прогнозов похоже на то, что я хочу ... но в официальной документации ничего не говорится о том, как предполагается использовать этот класс. Я думал, что TFTransformOutput звучит как многообещающий глагол , но вместо этого это существительное.

Я явно упускаю здесь что-то фундаментальное. Есть ли причина, по которой никто не хочет хранить результаты BulkInferrer в базе данных? Есть ли опция конфигурации, которая позволяет мне записывать результаты в БД? Возможно, я хочу добавить ParquetIO или Экземпляр BigQueryIO в конвейер TFX ? (В документации TFX говорится, что он использует Beam под капотом, но это мало что говорит о как я должен использовать их вместе.) Но синтаксис в этих документах выглядит настолько отличным от моего кода TFX, что я не уверен, совместимы ли они?

Помощь?


person Sarah Messer    schedule 31.12.2020    source источник


Ответы (2)


(Скопировано из связанного выпуска для большей наглядности)

После некоторого покопания, вот альтернативный подход, который предполагает незнание feature_spec заранее. Сделайте следующее:

  • Настройте BulkInferrer для записи в output_examples, а не в inference_result, добавив output_example_spec в конструкцию компонента.
  • Добавьте компоненты StatisticsGen и SchemaGen в основной конвейер сразу после BulkInferrer, чтобы сгенерировать схему для вышеупомянутого output_examples
  • Используйте артефакты из SchemaGen и BulkInferrer, чтобы прочитать TFRecords и сделать все необходимое.
bulk_inferrer = BulkInferrer(
     ....
     output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
         output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
             predict_output=bulk_inferrer_pb2.PredictOutput(
                 output_columns=[bulk_inferrer_pb2.PredictOutputCol(
                     output_key='original_label_name',
                     output_column='output_label_column_name', )]))]
     ))

 statistics = StatisticsGen(
     examples=bulk_inferrer.outputs.output_examples
 )

 schema = SchemaGen(
     statistics=statistics.outputs.output,
 )

После этого можно делать следующее:

import tensorflow as tf
from tfx.utils import io_utils
from tensorflow_transform.tf_metadata import schema_utils

# read schema from SchemaGen
schema_path = '/path/to/schemagen/schema.pbtxt'
schema_proto = io_utils.SchemaReader().read(schema_path)
spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec

# read inferred results
data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
dataset = tf.data.TFRecordDataset(data_files, compression_type='GZIP')

# parse dataset with spec
def parse(raw_record):
    return tf.io.parse_example(raw_record, spec)

dataset = dataset.map(parse)

На этом этапе набор данных аналогичен любому другому проанализированному набору данных, поэтому нетрудно записать CSV, или таблицу BigQuery, или что-то еще оттуда. Это определенно помогло нам в ZenML с нашим BatchInferencePipeline.

person Hamza Tahir    schedule 31.01.2021
comment
Голосование за поддержку, потому что этот подход имеет смысл, хорошо задокументирован и, похоже, будет работать. Я не буду принимать его, пока не проверю, что это работает. - person Sarah Messer; 01.02.2021
comment
Спасибо - надеюсь, это сработает - person Hamza Tahir; 05.02.2021

Отвечая на мой собственный вопрос здесь, чтобы задокументировать то, что мы сделали, хотя я думаю, что ответ @Hamza Tahir ниже объективно лучше. Это может предоставить возможность для других ситуаций, когда необходимо изменить работу готового компонента TFX. Хотя это хакерство:

Мы скопировали и отредактировали файл tfx / components / bulk_inferrer / executor.py, заменив это преобразование во внутреннем конвейере метода _run_model_inference():

| 'WritePredictionLogs' >> beam.io.WriteToTFRecord(
             os.path.join(inference_result.uri, _PREDICTION_LOGS_FILE_NAME),
             file_name_suffix='.gz',
             coder=beam.coders.ProtoCoder(prediction_log_pb2.PredictionLog)))

с этим:

| 'WritePredictionLogsBigquery' >> beam.io.WriteToBigQuery(
           'our_project:namespace.TableName',
           schema='SCHEMA_AUTODETECT',
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           custom_gcs_temp_location='gs://our-storage-bucket/tmp',
           temp_file_format='NEWLINE_DELIMITED_JSON',
           ignore_insert_ids=True,
       )

(Это работает, потому что при импорте компонента BulkInferrer работа каждого узла передается этим исполнителям, работающим на рабочих узлах, а TFX копирует свою собственную библиотеку на эти узлы. Он не копирует все из библиотек пользовательского пространства, поэтому мы не могли просто создать подкласс BulkInferrer и импортировать нашу собственную версию.)

Нам нужно было убедиться, что таблица в 'our_project:namespace.TableName' имеет схему, совместимую с выходными данными модели, но не нужно переводить эту схему в JSON / AVRO.

Теоретически моя группа хотела бы сделать пул-реквест с TFX, построенным вокруг этого, но пока мы жестко кодируем пару ключевых параметров, и у нас нет времени, чтобы перевести это в реальное публичное / производственное состояние.

person Sarah Messer    schedule 05.02.2021
comment
Привет, @Sarah Messer, попробуйте ZenML (github.com/maiot-io/zenml) - Судя по вашим требованиям, это может быть полезно. Не стесняйтесь обращаться напрямую - мы ищем варианты использования, с которыми можно работать, поскольку они находятся на ранних этапах разработки. - person Hamza Tahir; 06.02.2021