У меня есть файл, мне нужна карта/уменьшенная, где для вывода нужна сумма и максимум даты. У меня работает часть суммы, однако я не уверен, как включить максимальную дату как часть сокращенного вывода.
Входные данные выглядят примерно так:
ID1, ID2, date, count
3000, 001, 2014-12-30 18:00:00, 2
3000, 001, 2015-01-01 10:00:00, 1
3000, 002, 2014-11-18 12:53:00, 5
3000, 002, 2014-12-20 20:14:00, 3
Мой преобразователь объединяет ID1 + ID2, чтобы они были сгруппированы. Его вывод выглядит следующим образом:
key (ID1|ID2), value (count)
3000|001, 2
3000|001, 1
3000|002, 5
3000|002, 3
Выход редуктора выглядит так:
key (ID1|ID2), value (sum)
3000|001, 3
3000|002, 8
То, что мне действительно нужно, выводится следующим образом:
key (ID1|ID2), value (sum), date (max)
3000|001, 3, 2015-01-01 10:00:00
3000|002, 8, 2014-12-20 20:14:00
Маппер и редьюсер написаны на Ruby, однако я возьму рабочий пример, написанный на Python (переведу на Ruby).
Вот код картографа:
require 'csv'
pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')
Dir.glob(pattern).each do |file|
CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
puts [
"#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
row[7] # value = count
].join("\t")
end
end
И редуктор:
prev_key = nil
key_total = 0
ARGF.each do |line|
line = line.chomp
next unless line
(key, value) = line.split("\t")
# check for new key
if prev_key && key != prev_key && key_total > 0
# output total for previous key
puts [prev_key, key_total].join("\t")
# reset key total for new key
prev_key = key
key_total = 0
elsif !prev_key
prev_key = key
end
# add to count for this current key
key_total += value.to_i
end
# this is to catch the final counts after all records have been received
puts [prev_key, key_total].join("\t")
ОБНОВЛЕНИЕ
Вот новый преобразователь и редуктор, основанный на предложении из принятого ответа:
картограф:
require 'csv'
pattern = File.join(File.expand_path('data', File.dirname(__FILE__)), '*.txt')
Dir.glob(pattern).each do |file|
CSV.foreach(file, {col_sep: "\t", headers: false}) do |row|
date_time = "#{row[0]} #{row[1]}:00:00#{row[2]}" # %Y-%m-%d %H:%M:%S%z
puts [
"#{row[6]}|#{row[3].rjust(8, '0')}", # key = ID1 | ID2
"#{row[7]}|#{date_time}", # value = count | date_time
].join("\t")
end
end
редуктор:
require 'date'
prev_key = nil
key_total = 0
dates = []
ARGF.each do |line|
line = line.chomp
next unless line
(key, values) = line.split("\t")
(value, date_time) = values.split('|')
# check for new key
if prev_key && key != prev_key && key_total > 0
# output total for previous key
puts [prev_key.split('|'), key_total, dates.max].join("\t")
# reset key total for new key
prev_key = key
key_total = 0
# reset dates array for new key
dates.clear
elsif !prev_key
prev_key = key
end
# add date to array for this current key
dates << DateTime.strptime(date_time, '%Y-%m-%d %H:%M:%S%z')
# add to count for this current key
key_total += value.to_i
end
# this is to catch the final counts after all records have been received
puts [prev_key.split('|'), key_total, dates.max].join("\t")