Как посчитать частоту столбцов в строке в typedpipe в scalding?

В настоящее время я работаю над заданием mapreduce, используя ошпаривание. Я пытаюсь установить порог, основываясь на том, сколько раз я вижу определенное значение среди строк в моем typedpipe. Например, если бы у меня были эти строки в моей typedpipe:

Колонка 1 | Колонка 2

привет | 'Привет'

'hi' | 'ho'

'hi' | 'ho'

'пока' | 'до свидания'

Я хотел бы добавить к каждой строке частоту, которую я видел в столбце 1 и столбце 2 в каждой строке. Это означает, что вывод будет выглядеть так:

Колонка 1 | Колонка 2 | Столбец 1 Частота | Столбец 2 Частота

привет | 'эй' | 3 | 1

'hi' | 'ho' | 3 | 2

'hi' | 'ho' | 3 | 2

'пока' | 'пока' | 1 | 1

В настоящее время я делаю это, группируя типизированный канал по каждому столбцу, например так:

  val key2Freqs = input.groupBy('key2) {
    _.size('key2Freq)
  }.rename('key2 -> 'key2Right).project('key2Right, 'key2Freq);

Затем присоедините исходный ввод с помощью key2Freqs следующим образом:

  .joinWithSmaller('key2 -> 'key2Right, key2Freqs, joiner = new LeftJoin)

Однако это очень медленно и кажется мне довольно неэффективным для того, что по сути является довольно простой задачей. Это становится особенно длинным, потому что у меня есть 6 разных ключей, для которых я хочу получить эти значения, и в настоящее время я сопоставляю и присоединяюсь 6 раз в своей работе. Должен быть лучший способ сделать это, верно?


person Lucas    schedule 11.02.2016    source источник


Ответы (1)


Если количество различных значений в каждом столбце достаточно мало, чтобы поместить их все в память, вы можете .map ваши столбцы в Map[String,Int], а затем .groupAll.sum подсчитать их все за один раз (я использую нотацию «типизированный API», не надо не совсем помню, как именно это делается в полях API, но вы поняли). Вам нужно будет использовать MapMonoid из algebird или просто напишите свой собственный, если вы не хотите добавлять зависимость для этой вещи, это несложно. Затем вы получите канал, содержащий одну запись для результирующего Map. Теперь вы можете получить исходную трубу и выполнить .crossWithTiny, чтобы ввести в нее карту с подсчетами, а затем .map, чтобы извлечь отдельные подсчеты.

В противном случае, если вы не можете удержать все это в памяти, то то, что вы сейчас делаете, кажется единственным способом... если только вы на самом деле не ищете приближение "лучших хитов", а не точные подсчеты всего вселенная... в этом случае посмотрите SketchMap от Algebird.

person Dima    schedule 12.02.2016