Мое приложение Spark получает двоичные данные от Kafka. Данные отправляются в Kafka в виде байтового прото-сообщения. Прото-сообщение:
message Any {
string type_url=1;
bytes value=2;
}
С помощью библиотеки ScalaPB я могу десериализовать любое сообщение в исходный формат. Как десериализовать значение из байтов в читаемый формат? SerializationUtils не работает. Вот как сообщение Any выглядит после десериализации.
#+-----------------------------------------|
#| type_url | value |
#+-----------------------------------------|
#|type.googleapis.c...|[0A 8D D8 04 0A 1...|
#+-----------------------------------------|
Значение по-прежнему имеет байтовый формат. После десериализации с помощью SerializationUtils данные неверны.
#+-----------------+
#|value |
#+-----------------+
#|2020-09-04T10:...|
#+-----------------+
Есть ли другая альтернатива? Есть ли способ десериализовать байты в String, Struct или String Json?
Я использую пример ScalaPBs с udf для десериализации байтов в любое сообщение.
val parseCloud = ProtoSQL.udf { bytes: Array[Byte] => CloudEvent.parseFrom(bytes) }
UDF с SerializationUtils для значения bytes выглядит следующим образом.
val parseBytes = ProtoSQL.udf {bytes: Array[Byte] => deserialize(bytes)}