Я почти закончил свой проект Scalding, который использует Type Safe API вместо Fields API. Последняя проблема, которая остается для меня в общей настройке проекта, — это интеграционные тесты всего задания Scalding (я закончил модульные тесты для шаблона Type Safe External Operations, ура!). Это означает запустить всю работу и протестировать вывод различных приемников моей работы.
Однако происходит нечто весьма своеобразное. В моем
typedSink { scala.collection.mutable.Buffer[] => Unit }
Похоже, что моя программа не видит буфер и ничего с ним не делает, поэтому интеграционный тест всегда проходит, даже если не должен. Ниже приведена как сама работа, так и тест, чтобы помочь понять, что происходит:
object MyJob {
val inputArgPath = "input"
val validOutputArgPath = "validOutput"
val invalidOutputArgPath = "invalidOutput"
}
class MyJob(args: Args) extends Job(args) {
import OperationWrappers._
implicit lazy val uId: Some[UniqueID] = Some(UniqueID.getIDFor(flowDef))
val inputPath: String = args(MyJob.inputArgPath)
val validOutputPath: String = args(MyJob.validOutputArgPath)
val invalidOutputPath: String = args(MyJob.invalidOutputArgPath)
val eventInput: TypedPipe[(LongWritable, Text)] = this.mode match {
case m: HadoopMode => TypedPipe.from(WritableSequenceFile[LongWritable, Text](inputPath))
case _ => TypedPipe.from(TypedTsv[(LongWritable, Text)](inputPath))
}
def returnOutputPipe(outputString: String): FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = {
val eventOutput: FixedPathSource with TypedSink[(LongWritable, Text)] with TypedSource[(LongWritable, Text)] = this.mode match {
case m: HadoopMode => WritableSequenceFile[LongWritable, Text](outputString)
case _ => TypedTsv[(LongWritable, Text)](outputString)
}
eventOutput
}
val validatedEvents: TypedPipe[(LongWritable, Either[Text, Event])] = eventInput.convertJsonToEither.forceToDisk
validatedEvents.removeInvalidTuples.removeEitherWrapper.write(returnOutputPipe(invalidOutputPath))
validatedEvents.keepValidTuples.removeEitherWrapper.write(returnOutputPipe(validOutputPath))
override protected def handleStats(statsData: CascadingStats) = {
//This is code to handle counters.
}
}
Ниже приведен интеграционный тест:
class MyJobTest extends FlatSpec with Matchers {
private val LOG = LoggerFactory.getLogger(classOf[MyJobTest])
val validEvents: List[(LongWritable, Text)] = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/validEvents.txt")).getLines().toList.map(s => {
val eventText = new Text
val typedFields = s.split(Constants.TAB)
eventText.set(typedFields(1))
(new LongWritable(typedFields(0).toLong), eventText)
})
"Integrate-Test: My Job" should "run test" in {
LOG.info("Before Job Test starts.")
JobTest(classOf[MyJob].getName)
.arg(MyJob.inputArgPath, "input")
.arg(MyJob.invalidOutputArgPath, "invalidOutput")
.arg(MyJob.validOutputArgPath, "validOutput")
.source(TypedTsv[(LongWritable, Text)]("input"), validEvents)
.typedSink[(LongWritable, Text)](TypedTsv[(LongWritable, Text)]("invalidOutput")) {
(buffer: mutable.Buffer[(LongWritable, Text)]) => {
LOG.info("This is inside the buffer1.")
buffer.size should equal(1000000)
}
}
.typedSink[(LongWritable, Text)](TypedTsv[(LongWritable, Text)]("validOutput")) {
(buffer: mutable.Buffer[(LongWritable, Text)]) => {
LOG.info("This is inside the buffer2.")
buffer.size should equal(1000000000)
}
}
.run
.finish
}
}
И, наконец, вывод:
[INFO] --- maven-surefire-plugin:2.7:test (default-test) @ MyJob ---
[INFO] Tests are skipped.
[INFO]
[INFO] --- scalatest-maven-plugin:1.0:test (test) @ MyJob ---
Discovery starting.
16/01/28 10:06:42 INFO jobs.MyJobTest: Before Job Test starts.
16/01/28 10:06:42 INFO property.AppProps: using app.id: A98C9B84C79348F8A7784D8247410C13
16/01/28 10:06:42 INFO util.Version: Concurrent, Inc - Cascading 2.6.1
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] starting
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] source: MemoryTap["NullScheme"]["0.2996348736498404"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] sink: MemoryTap["NullScheme"]["0.8393418014297485"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] sink: MemoryTap["NullScheme"]["0.20643450953780684"]
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] parallel execution is enabled: true
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] starting jobs: 1
16/01/28 10:06:42 INFO flow.Flow: [com.myCompany.myProject.c...] allocating threads: 1
16/01/28 10:06:42 INFO flow.FlowStep: [com.myCompany.myProject.c...] starting step: local
16/01/28 10:06:42 INFO util.Version: HV000001: Hibernate Validator 5.0.3.Final
Dumping custom counters:
rawEvent 6
validEvent 6
16/01/28 10:06:42 INFO jobs.MyJob: RawEvents: 6
16/01/28 10:06:42 INFO jobs.MyJob: ValidEvents: 6
16/01/28 10:06:42 INFO jobs.MyJob: InvalidEvents: 0
16/01/28 10:06:42 INFO jobs.MyJob: Job has valid counters and is exiting successfully.
Как видите, Logger регистрирует «Перед запуском теста работы», но внутри частей typedSink ничего не происходит. Это расстраивает, потому что мой код похож на весь другой код, который я вижу для этого, но он не работает. Он должен провалить тест, но все работает успешно. Кроме того, Logger внутри typedSink никогда не выводит данные. Наконец, если вы посмотрите на вывод, вы увидите, что счетчики обрабатываются правильно, поэтому задание выполняется до завершения. Я провел много часов, пробуя новые вещи, но ничего не работает. Надеюсь, сообщество сможет мне помочь. Спасибо!
mode
ради эксперимента и посмотрите, поможет ли это. Я не помню, как это, но не удивлюсь, еслиJobTest
будет работать в режиме Hadoop. - person Dima   schedule 28.01.2016