Я пытаюсь распараллелить свой конвейер ввода, используя tf.data.Dataset
и TFRecordDataset
.
files = tf.data.Dataset.list_files("./data/*.avro")
dataset = tf.data.TFRecordDataset(files, num_parallel_reads=16)
dataset = dataset.apply(tf.contrib.data.map_and_batch(
preprocess_fn, 512, num_parallel_batches=16) )
Я не уверен, как написать preprocess_fn
, если входные данные представляют собой файл AVRO (который похож на JSON).
В настоящее время я использую tf.data.Dataset.from_generator
и скармливаю ему записи avro, проанализированные pyavroc
или подобными читателями avro. Но я не уверен, как распараллелить это, поскольку метод from_generator
не имеет доступной опции num_parallel_reads
.
def gen():
for file in all_avro_files:
x, y = read_local_avro_data(file)
for i, sample in enumerate( x ):
yield sample, y[i]
dataset = tf.data.Dataset.from_generator( gen,
(tf.float32, tf.float64),
( tf.TensorShape([13000]), tf.TensorShape([])
)
)
Чтение файла за файлом явно является узким местом, и я вижу, что все ядра ожидают данных после исчерпания данных из предыдущего пакета.
Как оптимизировать любой из подходов?