Несколько соображений относительно Dask Futures для повышения эффективности вычислений и управления данными.

Разбивка ваших задач Dask на более мелкие фрагменты с одной операцией может улучшить распараллеливание рабочего процесса. Это верно, но также требуется немного подумать о том, как эти задачи передаются планировщику. Цель этой статьи — дать некоторые полезные советы по использованию Dask Futures.

Код, который привел к этой статье, был реализован для создания входного массива для алгоритма Dask KMeans. Массив был предоставлен в виде изображений с очень высоким разрешением (VHR) (массив представлен как «arr» в последующем коде) и преобразован и изменен с (1, 2269, 2029) на (4,6M, 16) — это 4,6 миллиона строк. из 16 объектов с разрешением 2,5x2,5 м. 16 функций включают растровые данные RGB и расчетные индексы как с дронов, так и с изображений Sentinel-2. Исходное пространственное разрешение этих данных составляет 0,15x0,15 м, и в настоящее время они пересчитаны до 2,5x2,5 м (с понижением частоты дискретизации для дрона и с повышением частоты дискретизации для Sentinel-2) для обработки огромного объема обрабатываемых данных.

В качестве типичного подхода к подбору алгоритма KMeans можно взять случайную выборку точек входных данных для определения центроидов кластера. Эта информация является важной частью открытия правильного способа использования Dask Futures.

Ниже представлена ​​заготовка для создания алгоритма кластера KMeans в Dask.

import dask_ml.cluster
km = dask_ml.cluster.KMeans(n_clusters, n_iterations, oversampling)

НЕ

Имеет смысл разбить задачи на логические шаги, поэтому передача операций с массивами и предварительная сборка фьючерсов помогают построить граф вычислений, который будет обрабатываться планировщиком Dask.

def get_arr(arr, i):
     return da.where(
               (((da.isnan(arr[i]))|(da.isinf(arr[i])))), -1, arr[i]
            )
smpl = random.sample(range(arr.shape[0]), sample_size)
stk = [client.submit(get_arr, arr, i) for i in smpl]
X = client.submit(da.vstack, X)
km_future = client.submit(km.fit, X)
km = client.gather(km_future)

Почему вы должны неделать это?

Понимание Pythonic List известно своей эффективностью по сравнению с итерацией цикла for. Этот подход _является_ более быстрым в этом отношении. Однако из-за последовательной отправки нетерпеливых задач планировщику этот подход недостаточно использует распределенные возможности Dask.

Этот подход также создает большую нагрузку на планировщик. Особенно при больших объемах данных и при неправильном управлении размерами фрагментов. Уменьшение размера фрагмента может снизить нагрузку на оперативную память, но все равно существует значительная утечка. Это давление на оперативную память в конечном итоге приводит к тому, что планировщик сам останавливает операции, которые проявляются только как «зависание» в задачах, обрабатываемых на неопределенный срок. В некоторые моменты, когда нагрузка на ОЗУ становилась слишком большой, модуль планировщика исключался из пула узлов, и поэтому рабочий процесс необходимо было запускать с самого начала.

Как упоминалось ранее в отношении случайной выборки точек, этот подход с трудом обрабатывал 512 точек данных. Часто вылетает с ошибкой KilledWorker из-за неэффективного управления сборкой и укладкой входных данных. Даже при 512 точках данных подготовка и подбор данных заняли четыре минуты.

DO

Теперь 512 точек данных — это крошечная часть из 4,6 млн доступных точек для выборки. Не говоря уже о высоких требованиях к ресурсам, которым подвергается планировщик, частых зависаниях и потере прогресса. Это необходимо решить, если какой-либо содержательный анализ может быть извлечен из последующей работы. Ниже, сделав шаг назад и подумав о том, что на самом деле означает отправка задач удаленному планировщику/кластеру, мы представляем переработку предыдущего подхода.

Пара соображений, которые были сделаны:

  • Где живут данные? Где должны храниться данные?
  • Даск предпочитает «чистые» функции.
def get_arr(arr, smpl):
    fit_stk = [
         da.where(
                (((da.isnan(arr[i]))|(da.isinf(arr[i])))), 
                  -1, arr[i]) for i in smpl
    ]
    return fit_stk
def stack_arr(fit_stk):
    X = da.vstack(fit_stk)
    X = X.rechunk({0: -1, 1: -1})
    return X
def fit_arr(km, X):
    return km.fit(X)
smpl = random.sample(range(arr.shape[0]), smple_size)
stk_list = client.submit(get_arr, arr, smpl)
X = client.submit(stack_arr, stk_list)
km_future = client.submit(fit_arr, km, X)
km = client.gather(km_future)

Почему мы должны делать это?

Этот подход разбивает задачи планировщика на фактические удаленные функции. Раньше методы массивов передавались как функция — это требовало раздачи массива с его методами воркерам.

Переход к построению входного стека в кластер путем помещения понимания списка внутрь удаленной функции, где живут данные, устранит последовательную обработку индексов в `get_arr`. Вместо того, чтобы ждать 512 последовательных шагов, обычно занимающих несколько минут, теперь это возвращает Future (нетерпеливо обработанный в кластере) в течение миллисекунд.

Что касается входных точек данных, этот подход позволил успешно подобрать 16 тысяч точек данных для алгоритма KMeans (это заняло 1 час 35 минут). Предыдущий подход «Нельзя» позволял обрабатывать 512 точек данных за четыре минуты. Для сравнения, этот подход «Выполнение» позволяет обрабатывать 2048 точек данных за то же время. Как мы видим, огромное улучшение! Конечно, наличие большего количества точек данных для подгонки повысит точность алгоритма при прогнозировании невидимых точек, и это было основной причиной пересмотра первоначального подхода.

Не менее важно то, что нагрузка на оперативную память была устранена, что позволило избежать исключений планировщика из Kubernetes. Больше не нужно начинать с нуля!

Сводка

Распределение требует шага назад при рассмотрении ваших целей. Сделать шаг назад, чтобы рассмотреть _how_ операцию, с которой можно взаимодействовать в распределенной среде, не так уж и очевидно, особенно если она скрыта за API. Эта статья призвана помочь в принятии решения о том, как реализовать функции, которые будут отправлены в виде фьючерсов в кластер Dask.