Привет! Сегодня я хочу поговорить о производственной реализации системы прогнозирования на базе сервисов XGBoost, Apache Spark и AWS.

Вы можете прочитать об основных идеях xgboost + Spark здесь:

Https://medium.com/cloudzone/xgboost-distributed-training-and-predicting-with-apache-spark-1127cdfb31ae.

Https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_tutorial.html

Наша задача:

поток создания прогнозов классификации зданий. Есть специалисты по данным, которые хотят иметь возможность обновлять набор данных и / или модель без привлечения инженеров по данным в наш поток.

Например, мы хотим спрогнозировать конверсии пользователей на N-й день.

Наша ситуация:

  • Специалисты по обработке данных (DS), которые могут работать в Rstudio (но не могут работать с R Spark API) и могут создавать модели с помощью xgboost.
  • Инженеры по данным (DE), которые могут работать в EMR (Hadoop), используя Java, Scala, Spark и многое другое.
  • Ежедневные пакетные вычисления.

Первая проблема:

Различные источники данных. DS и DE используют разные источники данных для создания окончательных наборов данных, например DS используют витрины данных, DE используют необработанные или полу-необработанные данные. Теоретически они должны получить тот же результат, потому что витрины данных создаются из необработанных данных. Но в реальной жизни существует множество проблем, которые часто делают наборы данных результатов из DE и DS разными. Основная причина различий - ошибки вычислений, которые совершаются либо DS, либо DE.

Вторая проблема:

Перенос набора данных и модели.

DE не могут работать со сценариями R, а DS не могут работать со Spark или Scala / Java, и это проблема для переноса модели и набора данных из Rstudio в вычисления EMR для создания повседневных прогнозов по всем данным.

Итак, решим первую проблему:

Мы должны настроить данные, используя

Во-первых, необходимо организовать работу с данными таким образом, чтобы специалисты по данным и дата-инженеры имели одинаковый доступ к одним и тем же данным.

Одним из решений является хранение всех данных в S3 в эффективном формате (Apache ORC или Apache Parquet) с таблицами в Athena (с метакаталогом Glue), которые создаются на основе этих данных.

Во-вторых, мы должны создать функцию в R, которая создает соединения с Athena через JDBC, выполняет запросы и возвращает результаты в формате R dataframe. Таким образом, с помощью этого наши DS могут писать SQL-запросы из R к нашим данным и в дальнейшем работать с результатами в общем формате.

И последнее, но не менее важное: мы должны создать аналогичное решение для наших DE. Мы можем написать несколько классов-оболочек для JDBC, Athena, aws sdk таким образом, чтобы DE мог передавать сценарий SQL, контекст Spark в некоторый метод нашего класса-оболочки и получать объект Spark Dataset с данными результата. Набор данных результата может быть огромным, и чтение огромных данных через JDBC приведет к снижению скорости или вообще к OOM. Вот почему важно получить результат от Athena не через JDBC, а сохранить результат в формате ORC или Parquet в S3, а затем прочитать его Spark.

Решим вторую проблему:

Частично мы решили эту проблему - теперь мы можем передавать набор данных из DS в DE, передав SQL-скрипт для создания этого набора данных в Athena.

Без ответа остался один вопрос: как перенести модель и запустить ее на кластере. И здесь нам помогает xgboost!

DS уже получили данные от Athena в формате набора данных R. Затем они могут очень быстро разрабатывать наборы данных, используя возможности R, и тестировать модели, основанные на прототипах наборов данных. Получив удовлетворительную точность модели и набора данных, они перемещают создание набора данных из Athena SQL + R в параметризованный запрос Athena SQL. Наконец, они сохраняют сценарий SQL, параметры для него и модель xgboost в S3.

DE создают искровую работу с помощью следующих шагов:

  1. Чтение шаблона SQL-скрипта, параметров для него и модели.
  2. Создание финального SQL-скрипта
  3. Создание набора данных в Spark с использованием этого скрипта и нашего класса-оболочки, о котором мы говорили ранее.
  4. Преобразование этого набора данных в векторизованный набор данных с поддержкой машинного обучения.
  5. Распределенные вычисления предсказаний
  6. Сохранение результата в S3

Кроме того, вам необходимо создать API, с помощью которого ваш отдел маркетинга сможет получать прогнозы и использовать их в своем бизнесе.

Давайте рассмотрим каждый шаг

  1. Заранее оговариваем названия скрипта, модели и файла с параметрами, а также место, где ДС будут его хранить в S3. Мы пишем простой класс / метод, который читает эти данные.
private static void loadModel(String modelPath)
{
  logger.info("Begin loading model");
  FileSystem        fileSystem = new S3FileSystemHelper().createAndInitialiseS3FileSystem(modelPath);
  FSDataInputStream fsDataInputStream;
  try
  {
    fsDataInputStream = fileSystem.open(new Path(modelPath));
  }
  catch (IOException e)
  {
    throw new RuntimeException("Can't create init FileSystem  or read model", e);
  }
  logger.info("Model was read");

  Booster model;
  try
  {
    model = XGBoost.loadModel(fsDataInputStream);
  }
  catch (XGBoostError xgBoostError)
  {
    throw new RuntimeException("Can't load model ", xgBoostError);
  }
  logger.info("Model was loaded");

  xgbClassificationModel = new XGBoostClassificationModel("test", 2, model);

  xgbClassificationModel.set(xgbClassificationModel.missing(), 0.0F);
}

2. Обговариваем формат параметров. У нас будет ряд динамических параметров, например, дата регистрации пользователей, каждый день нам нужно будет брать пользователей, которые зарегистрировались N дней назад. Следовательно, нужно правильно интерпретировать эти параметры. Для этого мы пишем класс, который будет правильно интерпретировать эти параметры и возвращать нам конкретные значения, которые необходимо подставить в сценарий SQL.

3. Создание набора данных в Spark с использованием нашего класса-оболочки.

4. Важно получить исключения столбцов для векторов признаков из DS. Таким образом, они могут изменить схему набора данных без каких-либо изменений из DE, потому что наш код может на лету понять, какие столбцы следует исключить.

private static Dataset<Row> transformRawDataSetToMlDataSet(Dataset<Row> inputData, List<String> colsForExclusion)
{
List<String> cols = new ArrayList<>(Arrays.asList(inputData.columns()));
//removing non-feature cols
for (String col : colsForExclusion)
{
cols.remove(col);
}
//cast datatype of featured columns to float
for (String col : cols)
{
inputData = inputData.withColumn(col + “_float”, inputData.col(col).cast(“float”))
.drop(col)
.withColumnRenamed(col + “_float”, col);
}
Dataset labelTransformed = inputData;
// transform features fields in one matrix field
VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(cols.toArray(new String[]{}))
.setOutputCol(“features”);
vectorAssembler.setHandleInvalid(“skip”);
//Here we creating dataset ready to work with xgboost: 1 vector column “features”, and columns for identification user
return vectorAssembler.transform(labelTransformed).select(“features”, JavaConversions.asScalaBuffer(colsForExclusion).toList());
}

5. Распределенные прогнозные вычисления:
Набор данных ‹Row› mlDataSet = transformRawDataSetToMlDataSet (inputData, colsForExclusion);
Набор данных ‹Row› result = xgbClassificationModel .transform (mlDataSet);
result.createOrReplaceTempView («результат»);

6. Сохраняем результат в s3. Например, мы сохраняем только тех пользователей, у которых есть класс 0 (в контексте нашей задачи они никогда не будут платить).

spark.sql(“SELECT user_id “ +
          “FROM result “ +
          “WHERE prediction < 1 “)
.coalesce(1)
.write()
.orc(resultPath);

С таким потоком ваши DS могут изменить набор данных, улучшить модель и передать все это в S3. И эти обновления будут автоматически учтены в расчетах. Почему? Чтобы сократить время вывода на рынок, обмен данными между DE и DS, а также уменьшить количество рутинных задач для DE.

Спасибо за внимание, вопросы и предложения приветствуются.