Как загрузить файлы tar.gz в потоковые наборы данных?

Я хотел бы выполнять потоковую передачу из файлов tar-gzip (tgz), которые включают мои фактические сохраненные данные в формате CSV.

Мне уже удалось организовать структурированную потоковую передачу с помощью spark 2.2, когда мои данные поступают в виде CSV-файлов, но на самом деле данные поступают в виде CSV-файлов, сжатых с помощью gzip.

Есть ли способ, которым триггер, созданный структурированной потоковой передачей, выполняет распаковку перед обработкой потока CSV?

Код, который я использую для обработки файлов, таков:

val schema = Encoders.product[RawData].schema
val trackerData = spark
  .readStream
  .option("delimiter", "\t")
  .schema(schema)
  .csv(path)
val exceptions = rawCientData
  .as[String]
  .flatMap(extractExceptions)
  .as[ExceptionData]

произвел вывод, как и ожидалось, когда путь указывает на файлы csv. Но я хотел бы использовать файлы tar gzip. Когда я пытаюсь поместить эти файлы по указанному пути, я не получаю никаких исключений, и пакетный вывод сообщает мне

  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/matthias/spark/simple_spark/src/main/resources/zsessionlog*]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 1095,
    "processedRowsPerSecond" : 211.0233185584891
  } ],

Но я не получаю никаких фактических данных. Консольная мойка выглядит так:

+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+

person Matthias Mueller    schedule 30.12.2017    source источник


Ответы (2)


Я не думаю, что чтение файлов tar.gz возможно в Spark (см. Чтение целых текстовых файлов из сжатие в Spark или поддержку gzip в Spark для некоторых идей).

Spark поддерживает файлы gzip, но они не рекомендуются, так как не могут быть разделены и приводят к созданию одного раздела (что, в свою очередь, делает Spark практически бесполезным).

Чтобы загружать сжатые файлы в Spark Structured Streaming, вы должны указать шаблон пути, чтобы файлы включались в загрузку, скажем, zsessionlog*.csv.gz или подобное. В противном случае csv загружает только файлы CSV.

Если вы настаиваете на том, чтобы Spark Structured Streaming обрабатывал файлы tar.gz, вы можете написать собственные данные потоковой передачи Source, чтобы выполнить не-tar.gz.

Учитывая, что файлы gzip не рекомендуются в качестве формата данных в Spark, вся идея использования Spark Structured Streaming не имеет особого смысла.

person Jacek Laskowski    schedule 30.12.2017
comment
Спасибо, Яцек, я проголосовал за ответ, но из-за моего низкого уровня репутации он в настоящее время не учитывается. BTW: Что касается Spark и использования файлов gzip, это не вопрос петиции, когда у вас есть › 200 файлов gzip. Вариантов для разбиения достаточно :-) - person Matthias Mueller; 31.12.2017
comment
@MatthiasMueller Давайте поговорим об этом по другому вопросу (и тогда давайте сделаем его более полным) --› stackoverflow.com/q/47604184/ 1305344. По рукам? - person Jacek Laskowski; 03.01.2018
comment
Да, было бы здорово, но я не могу добавить комментарий к другому вопросу или ответу, потому что у меня менее 50 репутаций :-( - person Matthias Mueller; 03.01.2018
comment
Какие тогда вопросы? Давайте вместе разработаем их здесь, и я обновлю правильный вопрос @ stackoverflow.com/q/47604184/1305344. - person Jacek Laskowski; 03.01.2018
comment
Хорошо, начнем: когда я реализую org.apache.spark.sql.execution.streaming.Sourceкакой будет минималистичная схема? StructType(StructField("value", StringType) :: Nil) И какова семантика getOffset и getBatch? - person Matthias Mueller; 04.01.2018
comment
@JacekLaskowski: Просто хочу повторить ответ Матиаса Мюллера. Хотя один файл gzip приводит к одному разделу при чтении как обычный DF, но это поток чтения, и в него поступает несколько файлов, и обычно каждый файл имеет небольшой размер в потоковой передаче, поэтому это не имеет значения - person James Nguyen; 23.08.2019

Я решил часть чтения файлов .tar.gz (.tgz) следующим образом: сжатие-кодек-для-hadoop/" rel="nofollow noreferrer">сайт Я создал свой собственный кодек TGZ

final class DecompressTgzCodec extends CompressionCodec {
  override def getDefaultExtension: String = ".tgz"

  override def createOutputStream(out: OutputStream): CompressionOutputStream = ???
  override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream = ???
  override def createCompressor(): Compressor = ???
  override def getCompressorType: Class[_ <: Compressor] = ???

  override def createInputStream(in: InputStream): CompressionInputStream = {
    new TarDecompressorStream(new TarArchiveInputStream(new GzipCompressorInputStream(in)))
  }
  override def createInputStream(in: InputStream, decompressor: Decompressor): CompressionInputStream = createInputStream(in)

  override def createDecompressor(): Decompressor = null
  override def getDecompressorType: Class[_ <: Decompressor] = null

  final class TarDecompressorStream(in: TarArchiveInputStream) extends DecompressorStream(in) {
    def updateStream(): Unit = {
      // still have data in stream -> done
      if (in.available() <= 0) {
        // create stream content from following tar elements one by one
        in.getNextTarEntry()
      }
    }

    override def read: Int = {
      checkStream()
      updateStream()
      in.read()
    }

    override def read(b: Array[Byte], off: Int, len: Int): Int = {
      checkStream()
      updateStream()
      in.read(b, off, len)
    }

    override def resetState(): Unit = {}
  }
}

И зарегистрировал его для использования искрой.

val conf = new SparkConf()
conf.set("spark.hadoop.io.compression.codecs", classOf[DecompressTgzCodec].getName)

val spark = SparkSession
  .builder()
  .master("local[*]")
  .config(conf)
  .appName("Streaming Example")
  .getOrCreate()

Работает именно так, как я хотел.

person Matthias Mueller    schedule 07.01.2018
comment
можете ли вы поделиться местоположением github, если оно существует. Я хотел бы проверить все импорты и зависимости? - person Douglas M; 31.12.2020