В настоящее время я работаю над заданием mapreduce, используя ошпаривание. Я пытаюсь установить порог, основываясь на том, сколько раз я вижу определенное значение среди строк в моем typedpipe. Например, если бы у меня были эти строки в моей typedpipe:
Колонка 1 | Колонка 2
привет | 'Привет'
'hi' | 'ho'
'hi' | 'ho'
'пока' | 'до свидания'
Я хотел бы добавить к каждой строке частоту, которую я видел в столбце 1 и столбце 2 в каждой строке. Это означает, что вывод будет выглядеть так:
Колонка 1 | Колонка 2 | Столбец 1 Частота | Столбец 2 Частота
привет | 'эй' | 3 | 1
'hi' | 'ho' | 3 | 2
'hi' | 'ho' | 3 | 2
'пока' | 'пока' | 1 | 1
В настоящее время я делаю это, группируя типизированный канал по каждому столбцу, например так:
val key2Freqs = input.groupBy('key2) {
_.size('key2Freq)
}.rename('key2 -> 'key2Right).project('key2Right, 'key2Freq);
Затем присоедините исходный ввод с помощью key2Freqs следующим образом:
.joinWithSmaller('key2 -> 'key2Right, key2Freqs, joiner = new LeftJoin)
Однако это очень медленно и кажется мне довольно неэффективным для того, что по сути является довольно простой задачей. Это становится особенно длинным, потому что у меня есть 6 разных ключей, для которых я хочу получить эти значения, и в настоящее время я сопоставляю и присоединяюсь 6 раз в своей работе. Должен быть лучший способ сделать это, верно?