Шаблоны проектирования в MPI: спящий корневой процесс при блокировке отправки и правильная балансировка нагрузки

Я запускаю код MPI на Python, используя mpi4py, который выглядит примерно так:

from mpi4py import MPI
import numpy as np
import os

comm = MPI.COMM_WORLD
rank = comm.Get_Rank()
size = comm.Get_Size()

if rank == 0:
  res = np.zeros(2**16)
  jobs = os.listdir('/my/data/dir')
  for i in xrange(len(jobs)):
    proc = (i % (size - 1)) + 1 #lacks load balancing
    buf = load_buf_from_file(job[i])
    #root waits here at 100%
    comm.Send([buf, dtype], dest = proc) #lacks load balancing
    comm.Recv([res, dtype], source = MPI.ANY_SOURCE)
    save_result_to_file(res)
else:
  buf = np.zeros(2**16)
  comm.Recv([buf, dtype], source = 0)
  res = do_lots_of_work(buf)
  comm.Send([res, dtype], dest = 0)

Я замечаю, что процесс root всегда занят (ЦП на 100%). Я предпочитаю, чтобы корневой процесс спал до тех пор, пока рабочий процесс не будет готов принять следующее сообщение. Какие шаблоны в программировании MPI способствуют такому поведению? Возможно, корневой процесс тоже должен работать?

Еще один недостаток этой схемы заключается в следующем... Если рабочий процесс 4 завершается раньше, чем 3, то 4 должен дождаться завершения 3, прежде чем получить новое сообщение от root, чтобы продолжить выполнение работы. Любые предложения о том, как спроектировать корневой процесс, который всегда пытается отправить следующее сообщение бездействующему процессу? Меня это в основном устраивает, потому что первый процесс, получивший сообщение, обычно завершается первым. Однако, если рабочая нагрузка изменяется для каждого сообщения, это не всегда так.

Спасибо, Кевин


person bfb    schedule 14.09.2014    source источник
comment
Конечно, вы можете улучшить это, но действительно ли MPI лучше всего подходит для того, что вы пытаетесь сделать здесь? В частности, почему программирование с помощью MPI лучше, чем просто создание сценария этого списка заданий с помощью gnu-parallel?   -  person Jonathan Dursi    schedule 15.09.2014
comment
Конечно, эта задача зернистая и смущающе параллельная. Я хотел изучить MPI, потому что я думаю, что это будет более полезно в будущем.   -  person bfb    schedule 15.09.2014


Ответы (4)


На ваш первый вопрос об использовании процессора, когда ранг 0 находится в Comm.Recv. Это проблема реализации. MPICH (и, возможно, многие другие) ожидают событий в узком цикле опроса, чтобы минимизировать задержку.

Ваш второй вопрос: если рабочие подразделения нерегулярны, как сбалансировать нагрузку. Ответ — неблокирующие операции. (Isend, Irecv и т.д.).

Один из возможных рабочих процессов может быть таким:

  • ранг 0 имеет очередь рабочих единиц
  • ранг 0 отправляет неблокирующую отправку каждому клиенту
  • когда клиент хочет работать, он получает от сервера и отправляет обратно сообщение о готовности
  • сервер получает сообщение о готовности и отправляет единицу работы.
  • сервер также выдает неблокирующий прием для возможного сообщения «я готов».
  • когда любой клиент завершает работу, он выдает сообщение "Я закончил, дайте мне еще"
  • сервер отправляет следующую рабочую единицу в очереди.
person Rob Latham    schedule 15.09.2014
comment
Использует ли MPI очередь для отслеживания всех неблокирующих запросов? Или это должен реализовать программист? Размер некоторых сообщений, которые я отправляю, составляет несколько МБ, и я предпочитаю не выделять память заранее для каждого сообщения, а только тогда, когда сообщение готово к использованию. - person bfb; 15.09.2014
comment
Существует правило повторного использования буфера, о котором должен помнить программист: буфер, который является частью неблокирующей отправки или получения, принадлежит MPI до тех пор, пока запрос не будет завершен. Вы бы поддерживали рабочую очередь, но могли выполнять неблокирующие операции для каждого клиента только по мере необходимости. - person Rob Latham; 15.09.2014
comment
Я понимаю. Я предпочитаю использовать Iprobe, чтобы определить, готов ли клиент отправить сообщение im done, потому что это позволяет избежать необходимости отслеживать N буферов, принадлежащих MPI. Мне кажется, что Iprobe с блокировкой вызовов Send/Recv удовлетворяет ту же проблему с меньшей сложностью. - person bfb; 15.09.2014

Возможно, использование одного ранга в качестве сервера для распределения заданий лучше всего подходит для балансировки нагрузки:

#!/usr/bin/env python
import mpi4py

import numpy as np
import os
import time

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
root = 0

if rank == root:
  for i in range(50):
    d = comm.recv(source = MPI.ANY_SOURCE)
    comm.send(i, dest = d)
  for i in range(size-1):
    d = comm.recv(source = MPI.ANY_SOURCE)
    comm.send(-1, dest = d)
    print('Closing', d)
else:
    while True:
        comm.send(rank, root)
        job = comm.recv(source = root)
        if job < 0: break

        print('Rank {} Job {}'.format(rank, job))
        time.sleep(np.random.random()%10)
person Arvind    schedule 10.02.2016

Я решаю эту проблему, добавляя больше логики в процедуру MPI перед отправкой от корня:

if i > size - 1:
  #probe for response, and send next message to proc that responds first
  #sleep 1 second between probing different procs
  r = 1
  while not comm.Iprobe(source = r):
    time.sleep(1)
    r = (r % (size - 1)) + 1
  res = comm.Recv([res, dtype], source = r)
  proc = r
else:
  #initialize the sends in serial (proc 1, ..., size-1)
  proc = i + 1

Есть ли другой способ сделать это в MPI?

person bfb    schedule 14.09.2014

Я только что сделал что-то очень похожее на ваш ответ, но могу предложить пару альтернатив. Я также опубликовал упрощенную версию моего кода.

Во-первых, вы можете сделать его более отзывчивым, прослушивая первый ответ от любого рабочего процесса. Для этого существует специальное исходное значение MPI.ANY_SOURCE. Используйте объект состояния, чтобы определить, из какого фактического источника пришло сообщение.

# Set this to 0 for maximum responsiveness, but that will peg CPU to 100%
sleep_seconds = 0.1
if sleep_seconds > 0:
    while not comm.Iprobe(source=MPI.ANY_SOURCE):
        time.sleep(sleep_seconds)

status = MPI.Status()
result = comm.recv(source=MPI.ANY_SOURCE, status=status)
logging.info('Received %r from rank %d', result, status.Get_source())

Я провел поиск и обнаружил, что занятое ожидание - это то, что вы хотите, если вы не делите свои процессоры с другими задачами. В этом случае вы просто установите для sleep_seconds значение 0 в моем фрагменте или просто вызовете recv() напрямую. Иногда нам приходится делиться своим окружением, поэтому я собираюсь провести опрос.

Примеры mpi4py-examples Йорга Борншайна включают пример получения задачи, который распределяет задачи разной длины по набору рабочих. Думаю, замена его вызова recv() моим фрагментом выше даст вам хорошее решение.

У меня была лишняя заморочка, что некоторые мои воркеры запускали многопоточные задачи, поэтому я не хотел перегружать свои процессоры. Я опубликовал суть многопоточности, в которой известно, сколько потоков будет запускать каждая задача, и отложены дополнительные рабочие процессы. для дополнительных потоков. Например, если задача будет выполняться в четырех потоках, мастер будет ждать, пока на одном хосте не будет четырех доступных рабочих процессов, затем он передаст задачу одному из рабочих процессов и оставит три других бездействующими, пока задача не завершится. .

person Don Kirkby    schedule 28.01.2015