Я хотел бы выполнять потоковую передачу из файлов tar-gzip (tgz), которые включают мои фактические сохраненные данные в формате CSV.
Мне уже удалось организовать структурированную потоковую передачу с помощью spark 2.2, когда мои данные поступают в виде CSV-файлов, но на самом деле данные поступают в виде CSV-файлов, сжатых с помощью gzip.
Есть ли способ, которым триггер, созданный структурированной потоковой передачей, выполняет распаковку перед обработкой потока CSV?
Код, который я использую для обработки файлов, таков:
val schema = Encoders.product[RawData].schema
val trackerData = spark
.readStream
.option("delimiter", "\t")
.schema(schema)
.csv(path)
val exceptions = rawCientData
.as[String]
.flatMap(extractExceptions)
.as[ExceptionData]
произвел вывод, как и ожидалось, когда путь указывает на файлы csv. Но я хотел бы использовать файлы tar gzip. Когда я пытаюсь поместить эти файлы по указанному пути, я не получаю никаких исключений, и пакетный вывод сообщает мне
"sources" : [ {
"description" : "FileStreamSource[file:/Users/matthias/spark/simple_spark/src/main/resources/zsessionlog*]",
"startOffset" : null,
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 1095,
"processedRowsPerSecond" : 211.0233185584891
} ],
Но я не получаю никаких фактических данных. Консольная мойка выглядит так:
+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+