Kinesis Firehose помещает объекты JSON в S3 без запятой-разделителя

Перед отправкой данных я использую JSON.stringify для данных, и это выглядит так

{"data": [{"key1": value1, "key2": value2}, {"key1": value1, "key2": value2}]}

Но как только он проходит через AWS API Gateway и Kinesis Firehose переводит его на S3, он выглядит так:

    {
     "key1": value1, 
     "key2": value2
    }{
     "key1": value1, 
     "key2": value2
    }

Запятая-разделитель между объектами JSON исчезла, но мне нужно, чтобы она правильно обрабатывала данные.

Шаблон в шлюзе API:

#set($root = $input.path('$'))
{
    "DeliveryStreamName": "some-delivery-stream",
    "Records": [
#foreach($r in $root.data)
#set($data = "{
    ""key1"": ""$r.value1"",
    ""key2"": ""$r.value2""
}")
    {
        "Data": "$util.base64Encode($data)"
    }#if($foreach.hasNext),#end
#end
    ]
}



Ответы (3)


Недавно у меня была такая же проблема, и единственные ответы, которые я смог найти, заключались в том, чтобы просто добавлять разрывы строк ("\n") в конец каждого сообщения JSON всякий раз, когда вы отправляли их в поток Kinesis, или использовать необработанный Некоторый метод декодера JSON, который может обрабатывать объединенные объекты JSON без разделителей.

Я разместил решение кода Python, которое можно найти здесь, в соответствующем сообщении о переполнении стека: https://stackoverflow.com/a/49417680/1546785

person Tom Chapin    schedule 22.03.2018

После того, как AWS Firehose выгрузит объекты JSON в s3, вполне возможно прочитать отдельные объекты JSON из файлов.

Используя Python, вы можете использовать функцию raw_decode из пакета json.

from json import JSONDecoder, JSONDecodeError
import re
import json
import boto3

NOT_WHITESPACE = re.compile(r'[^\s]')

def decode_stacked(document, pos=0, decoder=JSONDecoder()):
    while True:
        match = NOT_WHITESPACE.search(document, pos)
        if not match:
            return
        pos = match.start()

        try:
            obj, pos = decoder.raw_decode(document, pos)
        except JSONDecodeError:
            # do something sensible if there's some error
            raise
        yield obj

s3 = boto3.resource('s3')

obj = s3.Object("my-bukcet", "my-firehose-json-key.json")
file_content = obj.get()['Body'].read()
for obj in decode_stacked(file_content):
    print(json.dumps(obj))
    #  { "key1":value1,"key2":value2}
    #  { "key1":value1,"key2":value2}

источник: https://stackoverflow.com/a/50384432/1771155

Используя Glue/Pyspark, вы можете использовать

import json

rdd = sc.textFile("s3a://my-bucket/my-firehose-file-containing-json-objects")
df = rdd.map(lambda x: json.loads(x)).toDF()
df.show()

источник: https://stackoverflow.com/a/62984450/1771155

person Vincent Claes    schedule 19.07.2020

Один из возможных подходов — настроить обработку данных для потока доставки Kinesis Firehose, добавив функцию Lambda в качестве обработчика данных, которая будет выполняться перед окончательной доставкой данных в корзину S3.

DeliveryStream:
  ...
  Type: AWS::KinesisFirehose::DeliveryStream
  Properties:
    DeliveryStreamType: DirectPut
    ExtendedS3DestinationConfiguration:
      ...
      BucketARN: !GetAtt MyDeliveryBucket.Arn
      ProcessingConfiguration:
        Enabled: true
        Processors:
          - Parameters:
              - ParameterName: LambdaArn
                ParameterValue: !GetAtt MyTransformDataLambdaFunction.Arn
            Type: Lambda
    ...

А в функции Lambda убедитесь, что '\n' добавлено к строке JSON записи, см. ниже функцию Lambda myTransformData.ts в Node.js:

import {
  FirehoseTransformationEvent,
  FirehoseTransformationEventRecord,
  FirehoseTransformationHandler,
  FirehoseTransformationResult,
  FirehoseTransformationResultRecord,
} from 'aws-lambda';

const createDroppedRecord = (
  recordId: string
): FirehoseTransformationResultRecord => {
  return {
    recordId,
    result: 'Dropped',
    data: Buffer.from('').toString('base64'),
  };
};

const processData = (
  payloadStr: string,
  record: FirehoseTransformationEventRecord
) => {
  let jsonRecord;
  // ...
  // Process the orginal payload,
  // And create the record in JSON
  return jsonRecord;
};

const transformRecord = (
  record: FirehoseTransformationEventRecord
): FirehoseTransformationResultRecord => {
  try {
    const payloadStr = Buffer.from(record.data, 'base64').toString();
    const jsonRecord = processData(payloadStr, record);
    if (!jsonRecord) {
      console.error('Error creating json record');
      return createDroppedRecord(record.recordId);
    }
    return {
      recordId: record.recordId,
      result: 'Ok',
      // Ensure that '\n' is appended to the record's JSON string.
      data: Buffer.from(JSON.stringify(jsonRecord) + '\n').toString('base64'),
    };
  } catch (error) {
    console.error('Error processing record ${record.recordId}: ', error);
    return createDroppedRecord(record.recordId);
  }
};

const transformRecords = (
  event: FirehoseTransformationEvent
): FirehoseTransformationResult => {
  let records: FirehoseTransformationResultRecord[] = [];
  for (const record of event.records) {
    const transformed = transformRecord(record);
    records.push(transformed);
  }
  return { records };
};

export const handler: FirehoseTransformationHandler = async (
  event,
  _context
) => {
  const transformed = transformRecords(event);
  return transformed;
};

После установки разделителя новой строки сервисы AWS, такие как Athena, смогут правильно работать с данными записи JSON в корзине S3, а не видит только первую запись JSON.

person Yuci    schedule 11.12.2020