Взяв текстовый файл в формате ndjson, следующий код дает то, что я ожидал. Файл ndjson с кавычками .USD dict не вложен, а исходный элемент кавычек удален.
def unnest_quotes(element):
element['USDquotes'] = element['quotes']['USD']
del element['quotes']
return element
p = beam.Pipeline(options=pipeline_options)
ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
MapFormattedJson = ReadJson | 'Map Function' >> beam.Map(unnest_quotes)
MapFormattedJson | 'Write Map Output' >> WriteToText(known_args.output,coder=JsonCoder())
Однако, когда я пытаюсь добиться того же с помощью ParDo, я не понимаю его поведения.
class UnnestQuotes(beam.DoFn):
def process(self,element):
element['USDquotes'] = element['quotes']['USD']
del element['quotes']
return element
p = beam.Pipeline(options=pipeline_options)
ReadJson = p | ReadFromText(known_args.input,coder=JsonCoder())
ClassFormattedJson = ReadJson | 'Pardo' >> beam.ParDo(UnnestQuotes())
ClassFormattedJson | 'Write Class Output' >> WriteToText(known_args.output,coder=JsonCoder())
Это создает файл с каждым ключом dict в отдельной строке без значения, как показано ниже.
"last_updated"
"name"
"symbol"
"rank"
"total_supply"
"max_supply"
"circulating_supply"
"website_slug"
"id"
"USDquotes"
Это как если бы PCollection, созданная функцией Map, является полным dict, тогда как Pardo создает PCollection для каждого ключа.
Я знаю, что могу просто использовать функцию карты, но мне нужно понимать это поведение, когда мне понадобится использовать ParDo в будущем.