mutable.Buffer не работает с Scalding JobTest for Type Safe API

Я почти закончил свой проект Scalding, который использует Type Safe API вместо Fields API. Последняя проблема, которая остается для меня в общей настройке проекта, — это интеграционные тесты всего задания Scalding (я закончил модульные тесты для шаблона Type Safe External Operations, ура!). Это означает запустить всю работу и протестировать вывод различных приемников моей работы.

Однако происходит нечто весьма своеобразное. В моем

typedSink { scala.collection.mutable.Buffer[] => Unit }

Похоже, что моя программа не видит буфер и ничего с ним не делает, поэтому интеграционный тест всегда проходит, даже если не должен. Ниже приведена как сама работа, так и тест, чтобы помочь понять, что происходит:

object MyJob {
  val inputArgPath = "input"
  val validOutputArgPath = "validOutput"
  val invalidOutputArgPath = "invalidOutput"
}

class MyJob(args: Args) extends Job(args) {

  import OperationWrappers._

  implicit lazy val uId: Some[UniqueID] = Some(UniqueID.getIDFor(flowDef))

  val inputPath: String = args(MyJob.inputArgPath)
  val validOutputPath: String = args(MyJob.validOutputArgPath)
  val invalidOutputPath: String = args(MyJob.invalidOutputArgPath)

  val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
    case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](inputPath))
    case _ => TypedPipe.from(TypedTsv[(LongWritable, Text)](inputPath))
  }

  def returnOutputPipe(outputString: String): FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = {
    val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
      case m: HadoopMode => WritableSequenceFile[LongWritable, Text](outputString)
      case _ => TypedTsv[(LongWritable, Text)](outputString)
    }
    eventOutput
  }

  val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.convertJsonToEither.forceToDisk

  validatedEvents.removeInvalidTuples.removeEitherWrapper.write(returnOutputPipe(invalidOutputPath))
  validatedEvents.keepValidTuples.removeEitherWrapper.write(returnOutputPipe(validOutputPath))

  override protected def handleStats(statsData: CascadingStats) = {
    //This is code to handle counters.
  }
}

Ниже приведен интеграционный тест:

class MyJobTest extends FlatSpec with Matchers {
  private val LOG = LoggerFactory.getLogger(classOf[MyJobTest])

  val validEvents: List[(LongWritable, Text)] = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/validEvents.txt")).getLines().toList.map(s => {
    val eventText = new Text
    val typedFields = s.split(Constants.TAB)
    eventText.set(typedFields(1))
    (new LongWritable(typedFields(0).toLong), eventText)
  })

  "Integrate-Test: My Job" should "run test" in {

    LOG.info("Before Job Test starts.")
    JobTest(classOf[MyJob].getName)
      .arg(MyJob.inputArgPath, "input")
      .arg(MyJob.invalidOutputArgPath, "invalidOutput")
      .arg(MyJob.validOutputArgPath, "validOutput")
      .source(TypedTsv[(LongWritable, Text)]("input"), validEvents)
      .typedSink[(LongWritable, Text)](TypedTsv[(LongWritable, Text)]("invalidOutput")) {
      (buffer: mutable.Buffer[(LongWritable, Text)]) => {
        LOG.info("This is inside the buffer1.")
        buffer.size should equal(1000000)
      }
    }
      .typedSink[(LongWritable, Text)](TypedTsv[(LongWritable, Text)]("validOutput")) {
      (buffer: mutable.Buffer[(LongWritable, Text)]) => {
        LOG.info("This is inside the buffer2.")
        buffer.size should equal(1000000000)
      }
    }
      .run
      .finish
  }
}

И, наконец, вывод:

[INFO] --- maven-surefire-plugin:2.7:test (default-test) @ MyJob ---
[INFO] Tests are skipped.
[INFO]
[INFO] --- scalatest-maven-plugin:1.0:test (test) @ MyJob ---
Discovery starting.
16/01/28 10:06:42 INFO jobs.MyJobTest: Before Job Test starts.
16/01/28 10:06:42 INFO property.AppProps: using app.id: A98C9B84C79348F8A7784D8247410C13
16/01/28 10:06:42 INFO util.Version: Concurrent, Inc - Cascading 2.6.1
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] starting
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  source: MemoryTap["NullScheme"]["0.2996348736498404"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  sink: MemoryTap["NullScheme"]["0.8393418014297485"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  sink: MemoryTap["NullScheme"]["0.20643450953780684"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  parallel execution is enabled: true
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  starting jobs: 1
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...]  allocating threads: 1
16/01/28 10:06:42 INFO flow.FlowStep: [com.myCompany.myProject.c...] starting step: local
16/01/28 10:06:42 INFO util.Version: HV000001: Hibernate Validator 5.0.3.Final
Dumping custom counters:
rawEvent    6
validEvent  6
16/01/28 10:06:42 INFO jobs.MyJob: RawEvents: 6
16/01/28 10:06:42 INFO jobs.MyJob: ValidEvents: 6
16/01/28 10:06:42 INFO jobs.MyJob: InvalidEvents: 0
16/01/28 10:06:42 INFO jobs.MyJob: Job has valid counters and is exiting successfully.

Как видите, Logger регистрирует «Перед запуском теста работы», но внутри частей typedSink ничего не происходит. Это расстраивает, потому что мой код похож на весь другой код, который я вижу для этого, но он не работает. Он должен провалить тест, но все работает успешно. Кроме того, Logger внутри typedSink никогда не выводит данные. Наконец, если вы посмотрите на вывод, вы увидите, что счетчики обрабатываются правильно, поэтому задание выполняется до завершения. Я провел много часов, пробуя новые вещи, но ничего не работает. Надеюсь, сообщество сможет мне помочь. Спасибо!


person PhillipAMann    schedule 28.01.2016    source источник
comment
Попробуйте убрать галочку mode ради эксперимента и посмотрите, поможет ли это. Я не помню, как это, но не удивлюсь, если JobTest будет работать в режиме Hadoop.   -  person Dima    schedule 28.01.2016
comment
Моя команда использовала Fields API с книгой Scalding, и у них не было проблем с проверкой режима. Тем не менее, я попробую это прямо сейчас. Спасибо за ваше предложение!   -  person PhillipAMann    schedule 28.01.2016
comment
Я попробовал ваше предложение и получил это исключение: Причина: com.twitter.scalding.ModeException: Каскадный локальный режим не поддерживается для: com.twitter.scalding.WritableSequenceFileWrappedArray(input), который предполагает, что JobTest использует каскадный локальный режим.   -  person PhillipAMann    schedule 28.01.2016
comment
Хорошо, тогда я не знаю :) Пришло время запустить отладчик?   -  person Dima    schedule 28.01.2016
comment
У меня был разговор с некоторыми разработчиками Scalding в комнате Scalding Gitter. Мне нужно переосмыслить то, как я тестирую, и некоторые из моих первоначальных предположений о работе.   -  person PhillipAMann    schedule 28.01.2016
comment
Можете ли вы опубликовать то, что вы узнали в качестве ответа? Мне было бы интересно это увидеть.   -  person Dima    schedule 29.01.2016
comment
Я разместил свой ответ Дима.   -  person PhillipAMann    schedule 30.01.2016


Ответы (1)