Привет! Сегодня я хочу поговорить о производственной реализации системы прогнозирования на базе сервисов XGBoost, Apache Spark и AWS.
Вы можете прочитать об основных идеях xgboost + Spark здесь:
Https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_tutorial.html
Наша задача:
поток создания прогнозов классификации зданий. Есть специалисты по данным, которые хотят иметь возможность обновлять набор данных и / или модель без привлечения инженеров по данным в наш поток.
Например, мы хотим спрогнозировать конверсии пользователей на N-й день.
Наша ситуация:
- Специалисты по обработке данных (DS), которые могут работать в Rstudio (но не могут работать с R Spark API) и могут создавать модели с помощью xgboost.
- Инженеры по данным (DE), которые могут работать в EMR (Hadoop), используя Java, Scala, Spark и многое другое.
- Ежедневные пакетные вычисления.
Первая проблема:
Различные источники данных. DS и DE используют разные источники данных для создания окончательных наборов данных, например DS используют витрины данных, DE используют необработанные или полу-необработанные данные. Теоретически они должны получить тот же результат, потому что витрины данных создаются из необработанных данных. Но в реальной жизни существует множество проблем, которые часто делают наборы данных результатов из DE и DS разными. Основная причина различий - ошибки вычислений, которые совершаются либо DS, либо DE.
Вторая проблема:
Перенос набора данных и модели.
DE не могут работать со сценариями R, а DS не могут работать со Spark или Scala / Java, и это проблема для переноса модели и набора данных из Rstudio в вычисления EMR для создания повседневных прогнозов по всем данным.
Итак, решим первую проблему:
Мы должны настроить данные, используя
Во-первых, необходимо организовать работу с данными таким образом, чтобы специалисты по данным и дата-инженеры имели одинаковый доступ к одним и тем же данным.
Одним из решений является хранение всех данных в S3 в эффективном формате (Apache ORC или Apache Parquet) с таблицами в Athena (с метакаталогом Glue), которые создаются на основе этих данных.
Во-вторых, мы должны создать функцию в R, которая создает соединения с Athena через JDBC, выполняет запросы и возвращает результаты в формате R dataframe. Таким образом, с помощью этого наши DS могут писать SQL-запросы из R к нашим данным и в дальнейшем работать с результатами в общем формате.
И последнее, но не менее важное: мы должны создать аналогичное решение для наших DE. Мы можем написать несколько классов-оболочек для JDBC, Athena, aws sdk таким образом, чтобы DE мог передавать сценарий SQL, контекст Spark в некоторый метод нашего класса-оболочки и получать объект Spark Dataset с данными результата. Набор данных результата может быть огромным, и чтение огромных данных через JDBC приведет к снижению скорости или вообще к OOM. Вот почему важно получить результат от Athena не через JDBC, а сохранить результат в формате ORC или Parquet в S3, а затем прочитать его Spark.
Решим вторую проблему:
Частично мы решили эту проблему - теперь мы можем передавать набор данных из DS в DE, передав SQL-скрипт для создания этого набора данных в Athena.
Без ответа остался один вопрос: как перенести модель и запустить ее на кластере. И здесь нам помогает xgboost!
DS уже получили данные от Athena в формате набора данных R. Затем они могут очень быстро разрабатывать наборы данных, используя возможности R, и тестировать модели, основанные на прототипах наборов данных. Получив удовлетворительную точность модели и набора данных, они перемещают создание набора данных из Athena SQL + R в параметризованный запрос Athena SQL. Наконец, они сохраняют сценарий SQL, параметры для него и модель xgboost в S3.
DE создают искровую работу с помощью следующих шагов:
- Чтение шаблона SQL-скрипта, параметров для него и модели.
- Создание финального SQL-скрипта
- Создание набора данных в Spark с использованием этого скрипта и нашего класса-оболочки, о котором мы говорили ранее.
- Преобразование этого набора данных в векторизованный набор данных с поддержкой машинного обучения.
- Распределенные вычисления предсказаний
- Сохранение результата в S3
Кроме того, вам необходимо создать API, с помощью которого ваш отдел маркетинга сможет получать прогнозы и использовать их в своем бизнесе.
Давайте рассмотрим каждый шаг
- Заранее оговариваем названия скрипта, модели и файла с параметрами, а также место, где ДС будут его хранить в S3. Мы пишем простой класс / метод, который читает эти данные.
private static void loadModel(String modelPath) { logger.info("Begin loading model"); FileSystem fileSystem = new S3FileSystemHelper().createAndInitialiseS3FileSystem(modelPath); FSDataInputStream fsDataInputStream; try { fsDataInputStream = fileSystem.open(new Path(modelPath)); } catch (IOException e) { throw new RuntimeException("Can't create init FileSystem or read model", e); } logger.info("Model was read"); Booster model; try { model = XGBoost.loadModel(fsDataInputStream); } catch (XGBoostError xgBoostError) { throw new RuntimeException("Can't load model ", xgBoostError); } logger.info("Model was loaded"); xgbClassificationModel = new XGBoostClassificationModel("test", 2, model); xgbClassificationModel.set(xgbClassificationModel.missing(), 0.0F); }
2. Обговариваем формат параметров. У нас будет ряд динамических параметров, например, дата регистрации пользователей, каждый день нам нужно будет брать пользователей, которые зарегистрировались N дней назад. Следовательно, нужно правильно интерпретировать эти параметры. Для этого мы пишем класс, который будет правильно интерпретировать эти параметры и возвращать нам конкретные значения, которые необходимо подставить в сценарий SQL.
3. Создание набора данных в Spark с использованием нашего класса-оболочки.
4. Важно получить исключения столбцов для векторов признаков из DS. Таким образом, они могут изменить схему набора данных без каких-либо изменений из DE, потому что наш код может на лету понять, какие столбцы следует исключить.
private static Dataset<Row> transformRawDataSetToMlDataSet(Dataset<Row> inputData, List<String> colsForExclusion) { List<String> cols = new ArrayList<>(Arrays.asList(inputData.columns())); //removing non-feature cols for (String col : colsForExclusion) { cols.remove(col); } //cast datatype of featured columns to float for (String col : cols) { inputData = inputData.withColumn(col + “_float”, inputData.col(col).cast(“float”)) .drop(col) .withColumnRenamed(col + “_float”, col); } Dataset labelTransformed = inputData; // transform features fields in one matrix field VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(cols.toArray(new String[]{})) .setOutputCol(“features”); vectorAssembler.setHandleInvalid(“skip”); //Here we creating dataset ready to work with xgboost: 1 vector column “features”, and columns for identification user return vectorAssembler.transform(labelTransformed).select(“features”, JavaConversions.asScalaBuffer(colsForExclusion).toList()); }
5. Распределенные прогнозные вычисления:
Набор данных ‹Row› mlDataSet = transformRawDataSetToMlDataSet (inputData, colsForExclusion);
Набор данных ‹Row› result = xgbClassificationModel .transform (mlDataSet);
result.createOrReplaceTempView («результат»);
6. Сохраняем результат в s3. Например, мы сохраняем только тех пользователей, у которых есть класс 0 (в контексте нашей задачи они никогда не будут платить).
spark.sql(“SELECT user_id “ + “FROM result “ + “WHERE prediction < 1 “) .coalesce(1) .write() .orc(resultPath);
С таким потоком ваши DS могут изменить набор данных, улучшить модель и передать все это в S3. И эти обновления будут автоматически учтены в расчетах. Почему? Чтобы сократить время вывода на рынок, обмен данными между DE и DS, а также уменьшить количество рутинных задач для DE.
Спасибо за внимание, вопросы и предложения приветствуются.