Как анализировать и агрегировать данные из DynamoDB

Эта статья была впервые опубликована на bahr.dev.
Подпишитесь на рассылку и получайте новые статьи прямо на свой почтовый ящик!

DynamoDB - это не база данных, предназначенная для выполнения аналитических запросов. Однако мы можем использовать потоки DynamoDB и лямбда-функции для выполнения этого анализа при каждом изменении данных.

В этой статье объясняется, как построить конвейер анализа, и демонстрируется это на двух примерах. Вы должны быть знакомы с таблицами DynamoDB и AWS Lambda.

Настройка трубопровода

Предполагая, что у нас уже есть таблица DynamoDB, нам нужно настроить еще две части: поток DynamoDB и лямбда-функцию. Поток испускает изменения, такие как вставки, обновления и удаления.

DynamoDB Stream

Чтобы настроить поток DynamoDB, мы воспользуемся консолью управления AWS. Откройте настройки своего стола и нажмите кнопку «Управление потоком».

По умолчанию вы можете выбрать «Новые и старые изображения», которые предоставят вам больше всего данных для работы. После того, как вы включили поток, вы можете скопировать его ARN, который мы будем использовать на следующем шаге.

Присоедините лямбда-функцию

Когда вы работаете с бессерверной структурой, вы можете просто установить поток в качестве источника событий для своей функции, добавив ARN как stream в разделе events.

functions:
  analysis:
    handler: analysis.handle
    events:
      - stream: arn:aws:dynamodb:us-east-1:xxxxxxx:table/my-table/stream/2020-02-02T20:20:02.002

Разверните изменения с помощью sls deploy, и ваша функция будет готова обрабатывать входящие события. Рекомендуется начать с печати данных из DynamoDB, а затем построить свою функцию на основе этого ввода.

Дизайн данных

При использовании DynamoDB очень важно сначала подумать о шаблонах доступа к данным, иначе вам придется перестраивать таблицы гораздо чаще, чем необходимо. Также посмотрите фантастические шаблоны дизайна Рика Хулихана для DynamoDB из re: Invent 2018 и re: Invent 2019.

Пример 1: Расчет цены

В EVE Online предметы экономики, управляемые игроками, можно продавать по контрактам. В моем хобби-проекте используется API EVE Online для получения информации о контрактах на обмен предметами для расчета цен на эти предметы. За последний год он собрал более 1,5 миллиона контрактов и рассчитал цены примерно на 7000 наименований.

Предварительная обработка

Чтобы построить среднюю цену, нам нужно несколько ценовых пунктов. По этой причине одного контракта, который мы получаем, недостаточно, но нам нужны все цены на товар.

{
    "contract_id": 152838252,
    "date_issued": "2020-01-05T20:47:40Z",
    "issuer_id": 1273892852,
    "price": 69000000,
    "location_id": 60003760,
    "contract_items": [2047]
  }

Поскольку ответ API не имеет оптимального формата, мы должны выполнить некоторую предварительную обработку, чтобы удалить ненужную информацию и поместить ключевую информацию в первичный ключ таблицы и ключ сортировки. Помните, что сканирование таблиц может быть дорогостоящим, а меньшие записи означают большее количество записей на результат запроса.

|type_id (pk)|date (sk)           |price   |location|
|2047        |2020-01-05T20:47:40Z|69000000|60003760|

В этом случае я решил использовать идентификатор элемента (например, 2047) в качестве первичного ключа и дату в качестве ключа сортировки. Таким образом, мой анализатор может выбрать все записи для одного элемента и ограничить его самыми последними записями.

Анализ

Присоединенная лямбда-функция получает событие из потока. Это событие содержит, среди прочего, идентификатор товара, для которого необходимо рассчитать новую цену. Используя этот идентификатор, функция запрашивает предварительно обработанные данные и получает список элементов, по которым она может вычислять средние значения и другую ценную информацию.

Внимание: не сканируйте здесь! Это быстро станет дорого. Создавайте свои данные так, чтобы можно было использовать запросы.

Агрегированный результат сохраняется в другой таблице, из которой мы можем получить API ценообразования.

Представление

Без дополнительной настройки функция анализа будет тем медленнее, чем больше доступно ценовых пунктов. Однако он может ограничить количество загружаемых ценовых пунктов. Сканируя дату в ключе сортировки в обратном направлении, мы загружаем только самые последние и наиболее релевантные записи. Затем, исходя из наших требований, мы можем решить загрузить только одну или две страницы или выбрать самые последние 1000 записей. Таким образом, мы можем установить верхнюю границу времени выполнения для каждого элемента.

Пример 2: таблица лидеров

Другой пример, основанный на обсуждении в твиттере, касается таблицы лидеров. В немецкой футбольной лиге Бундеслига клуб из Кельна сегодня выиграл 4: 0 у клуба из Фрайбурга. Это означает, что Кельн получает три очка, а Фрайбург - ноль. Загрузка всех матчей и последующий подсчет рейтинга на лету приведет к плохой результативности, когда мы углубимся в сезон. Вот почему мы снова должны использовать потоки.

Дизайн данных

Предположим, что наша первая таблица содержит необработанные данные в следующем формате:

|league (pk)|match_id (sk)|first_party|second_party|score|
|Bundesliga |1            |Cologne    |Freiburg    |4:0  |
|Bundesliga |2            |Hoffenheim |Bayer       |2:1  |

Мы разрабатываем таблицу лидеров в формате, в котором мы можем хранить несколько лиг и разбивать участников на страницы. Мы будем использовать составной ключ сортировки, так как хотим, чтобы база данных сортировала таблицу лидеров сначала по счету, затем по количеству забитых голов и, наконец, по их имени.

|league (pk)|score#goals#name (sk)|score|goals_shot|name (GSI)|
|Bundesliga |003#004#Cologne      |3    |4         |Cologne   |
|Bundesliga |003#002#Hoffenheim   |3    |2         |Hoffenheim|
|Bundesliga |000#001#Bayer        |0    |1         |Bayer     |
|Bundesliga |000#000#Freiburg     |0    |0         |Freiburg  |

Поскольку ключ сортировки (sk) представляет собой строку, мы должны обнулить числа. Сортировка нескольких строк, содержащих числа, не даст того же результата, что и сортировка простых чисел. Выбирайте отступы с умом и выбирайте на пару порядков больше, чем вы ожидаете получить. Учтите, что этот подход не сработает, если ваши результаты могут расти бесконечно. Если у вас есть решение этой проблемы, поделитесь им, и я дам вам ссылку здесь!

Мы также добавляем GSI к названию клуба, чтобы иметь лучший доступ к записи в таблице лидеров одного клуба.

Анализ

Каждый раз, когда результат совпадения вставляется в первую таблицу, поток запускает событие для функции анализа. Эта запись содержит матч и его счет, по которому мы можем определить, кто сколько очков наберет.

Основываясь на названиях клубов, мы можем загрузить старые записи в таблице лидеров. Мы используем эти записи, чтобы сначала удалить существующие записи, затем взять существующие оценки и цели, добавить новые и записать новые записи в таблице лидеров.

|league (pk)|score#goals#name (sk)    |score|goals_shot|name (GSI)|
|Bundesliga |006#005#Cologne          |6    |5         |Cologne   |
|Bundesliga |003#005#Bayer            |3    |5         |Bayer     |
|Bundesliga |003#002#Hoffenheim       |3    |2         |Hoffenheim|
|Bundesliga |000#000#Freiburg         |0    |0         |Freiburg  |
...

Представление

Поскольку каждое совпадение приводит к одному или двум запросам и одному или двум обновлениям, время на обновление счета остается ограниченным.

Когда мы отображаем таблицу лидеров, рекомендуется использовать нумерацию страниц. Таким образом, пользователь видит соответствующий объем данных, и наши запросы также имеют ограниченное время выполнения.

Вам понравилась эта статья? Дайте мне знать в Твиттере и Инициативе для списка рассылки, чтобы получать новые статьи прямо на ваш почтовый ящик!