MapReduce попарное сравнение всех строк в нескольких файлах

Я начинаю использовать mrjob Python для преобразования некоторых из моих давно работающих программ Python в задания MapReduce Hadoop. . У меня есть простые примеры подсчета слов, и я концептуально понимаю пример «текстовой классификации».

Тем не менее, у меня возникли небольшие проблемы с определением шагов, которые мне нужно сделать, чтобы моя проблема заработала.

У меня есть несколько файлов (около 6000), каждый из которых содержит от 2 до 800 строк. В этом случае каждая строка представляет собой простой «сигнал», разделенный пробелами. Мне нужно сравнить корреляцию между каждой строкой в ​​каждом файле и КАЖДОЙ другой строкой во ВСЕХ файлах (включая себя). Затем на основе коэффициента корреляции я выведу результаты.

Пример одного файла:

1 2 3 4 2 3 1 2 3 4 1 2
2 2 3 1 3 3 1 2 3 1 4 1
2 3 4 5 3 2 1 3 4 5 2 1
...

Мне нужно вывести каждую СТРОКУ этого файла в паре с КАЖДОЙ ДРУГОЙ СТРОКОЙ из любого другого файла... или я мог бы объединить все файлы в один файл, если это упростит задачу, но мне все равно понадобится попарная итерация.

Я понимаю, как выполнять вычисления и как использовать последний шаг сокращения для агрегирования и фильтрации результатов. Трудность, с которой я столкнулся, заключается в том, как yield все попарные элементы выполнить последовательные шаги, не читая все файлы в одном наборе? Думаю, я мог бы заранее подготовить входной файл, в котором используется itertools.product, но этот файл был бы чрезмерно большим.


person JudoWill    schedule 10.07.2011    source источник
comment
Можете привести примерные данные?   -  person Donald Miner    schedule 11.07.2011
comment
Конечно, просто добавил :)   -  person JudoWill    schedule 11.07.2011
comment
может быть, я мог бы сначала использовать шаг reduce и дать всем строкам один и тот же ключ, чтобы функция получила все строки ... затем использовать все пары от itertools.product до yield с соответствующими ключами. Однако не уверен, что это правильный способ сделать что-то.   -  person JudoWill    schedule 11.07.2011
comment
Если у вас есть алгоритм O(n^2) и n примерно равно 600 000, я бы рекомендовал вам поискать алгоритм O(n).   -  person hughdbrown    schedule 11.07.2011
comment
К сожалению, это уже значительно уменьшилось по сравнению с первоначальным количеством элементов ... но в целом N составляет около 40 000. Я выполнил его на своей единственной машине примерно за ~ 4 дня ... Так что это не выходит за рамки разумности.   -  person JudoWill    schedule 11.07.2011
comment
Опять же, когда ваша первоначальная концепция проблемы приводит к алгоритму с 4-дневным временем выполнения, ответ заключается не в использовании Hadoop/map-reduce, а в поиске алгоритмического изменения для уменьшения сложности проблемы. Я не могу сказать, что понимаю, что вы делаете, но это мое общее наблюдение.   -  person hughdbrown    schedule 11.07.2011
comment
По сути, каждый сигнал поступает из области выровненного генома, где (в этом примере) каждый столбец представляет человека, а каждая строка представляет позицию в этом геноме. Идея состоит в том, что области, которые взаимодействуют (физически связываются), имеют более высокую взаимную корреляцию. Чтобы проверить это, мне нужно рассчитать корреляцию между ВСЕМИ парами и посмотреть, имеют ли ИСТИННЫЕ взаимодействующие (которые я получил из других методов) более высокую корреляцию (в среднем), чем не взаимодействующие. Таким образом, невозможно вычислить все пары без ущерба для статистической релевантности.   -  person JudoWill    schedule 11.07.2011


Ответы (1)


Что ж, поскольку никто не придумал ответа, я опубликую свой текущий обходной путь на случай, если он понадобится кому-то еще. Я не уверен, насколько это «канонично» или эффективно, но до сих пор это работало.

Я помещаю имя файла в качестве первого элемента каждой строки файла, за которым следует \t, за которым следуют остальные данные. В этом примере я просто использую одно число в каждой строке, а затем усредняю ​​их, как очень тривиальный пример.

Затем я сделал следующий шаг уменьшения карты в mrjob.

class MRAvgPairwiseLines(MRJob):

def input_mapper(self, _, value):
    """Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""

    fnum, val = value.split('\t')
    yield 'ALL', (fnum, val)

def input_reducer(self, key, values):

    for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
        yield fnum1, (fnum1, fnum2, val1, val2)

def do_avg(self, key, value):

    fnum1, fnum2, val1, val2 = value
    res = (float(val1)+float(val2))/float(2)
    yield key, (fnum2, res)

def get_max_avg(self, key, values):

    max_fnum, max_avg = max(values, key = lambda x: x[1])
    yield key, (max_fnum, max_avg)

def steps(self):
    return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
                self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]

Таким образом, все выходные данные функции input_mapper группируются в одну и ту же input_reducer, которая затем объединяется в последовательные пары yield. Затем они проходят в нужные места, чтобы, наконец, вернуть наибольшее среднее значение (которое на самом деле является самым большим элементом во всех других файлах).

Надеюсь, это поможет кому-то.

person JudoWill    schedule 10.07.2011
comment
Таким образом, вы в конечном итоге загружаете в память все декартово произведение? - person Rafael Almeida; 24.10.2012