Проверка всех рангов верна без использования сбора и разброса mpi4py

Я пытаюсь общаться между процессами, чтобы каждый процесс был уведомлен, когда все остальные процессы готовы. Фрагмент кода ниже делает это. Есть ли более элегантный способ сделать это?

def get_all_ready_status(ready_batch):
    all_ready= all(ready_batch)
    return [all_ready for _ in ready_batch]

ready_batch= comm.gather(ready_agent, root=0)
if rank == 0:
    all_ready_batch = get_all_ready_status(ready_batch)
all_ready_flag = comm.scatter(all_ready_batch , root=0)                


person terenceflow    schedule 17.03.2021    source источник


Ответы (1)


Если всем процессам необходимо знать, какие другие процессы готовы, вы можете использовать процедуру comm.Allgather:

from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendBuffer = numpy.ones(1, dtype=bool)
recvBuffer = numpy.zeros(size, dtype=bool)

print("Before Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allgather([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL])
print("After Allgather  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))

Вывод:

Before Allgather => Process 0 | sendBuffer [ True] | recvBuffer [False False]
Before Allgather => Process 1 | sendBuffer [ True] | recvBuffer [False False]
After Allgather  => Process 0 | sendBuffer [ True] | recvBuffer [ True  True]
After Allgather  => Process 1 | sendBuffer [ True] | recvBuffer [ True  True]

Как указано в комментариях @Gilles Gouaillardet:

если всем процессам нужно только знать, готовы ли все процессы, тогда MPI_Allreduce() подходит даже лучше.

Идея состоит в том, что теоретически Allreduce должен быть быстрее, чем Allgather, потому что первый может использовать древовидной модели связи, а также потому, что для этого потребуется выделять и передавать меньше памяти. Дополнительную информацию можно найти здесь.

В вашем случае вы используете MPI.LAND (т.е. логическое и) в качестве оператора операции Allreduce.

Пример:

from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendBuffer =  numpy.ones(1, dtype=bool) if rank % 2 ==  0 else numpy.zeros(1, dtype=bool)
recvBuffer = numpy.zeros(1, dtype=bool)

print("Before Allreduce => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allreduce([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL], MPI.LAND)
print("After Allreduce  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))

comm.Barrier()
if rank == 0:
   print("Second RUN")
comm.Barrier()

sendBuffer =  numpy.ones(1, dtype=bool)
recvBuffer = numpy.zeros(1, dtype=bool)

print("Before Allreduce => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allreduce([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL], MPI.LAND)
print("After Allreduce  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))

Вывод:

Before Allreduce => Process 1 | sendBuffer [False] | recvBuffer [False]
Before Allreduce => Process 0 | sendBuffer [ True] | recvBuffer [False]
After Allreduce  => Process 1 | sendBuffer [False] | recvBuffer [False]
After Allreduce  => Process 0 | sendBuffer [ True] | recvBuffer [False]
Second RUN
Before Allreduce => Process 0 | sendBuffer [ True] | recvBuffer [False]
Before Allreduce => Process 1 | sendBuffer [ True] | recvBuffer [False]
After Allreduce  => Process 0 | sendBuffer [ True] | recvBuffer [ True]
After Allreduce  => Process 1 | sendBuffer [ True] | recvBuffer [ True]

В первой части выходных данных (т. е. перед вторым запуском) результатом будет FALSE, поскольку процессы с четным рангом не готовы (т. е. False), а процессы с нечетным рангом готовы. Следовательно, False & True => False. Во второй части результат True, потому что все процессы были готовы.

person dreamcrash    schedule 17.03.2021