[Перепост из моего старого блога. Исходная дата публикации: 04.04.2014]

Это первая часть из трех частей, охватывающих все аспекты нашего анализа данных. В этой части описывается наш пайплайн. Во второй части будут подробно описаны конкретные дизайнерские решения. В третьей части рассказывается об истории аналитики на Twitch. Если в какой-то момент вы думаете про себя, что хотели бы узнать больше о том, что мы делаем, вам следует посетить нашу страницу команды.

Вначале мы записывали все наши данные в Mixpanel. По мере того, как наша пользовательская база росла, мы регистрировали все большее количество событий, этот рост точек данных значительно опережал наш рост пользователей до точки, когда мы отправляли Mixpanel миллиарды событий в неделю. По мере того, как наш рост продолжался, нам нужно было принимать лучшие решения, основанные на присоединении к различным мероприятиям, чтобы получить действительно глубокое представление о поведении наших пользователей. Метрики, основанные на подсчете, такие как предоставляемые Mixpanel или statsd, недостаточны, когда дело доходит до этого, и, учитывая постоянно растущую стоимость Mixpanel, мы собрали команду для работы над экономичным хранением наших данных о событиях, предоставляя инструменты для запросов. данные без этих недостатков.

Сегодня мы заменили характер Mixpanel, работающий в режиме, близком к реальному времени, нашим собственным решением для массовой загрузки данных, которое позволяет достичь нашей цели по задержке данных ‹24 часа. В настоящее время наша задержка составляет ~ 5 часов, и вскоре мы будем работать над «быстрым уровнем» поверх нашего текущего решения, чтобы предоставить нам возможность иметь информационные панели, отображающие статистику в реальном времени.

Мы сформировали команду в конце 2013 года и всерьез начали работу над проектом в начале 2014 года. Новый пайплайн, в настоящее время его третья версия, размещен на AWS, где мы используем Asgard для координации развертываний и эластичного масштабирования. Трубопровод состоит из следующих основных компонентов:

  1. Клиент присылает нам статистику
  2. Пограничный сервер
  3. Процессор
  4. Координатор загрузки
  5. Место хранения

Клиент

Естественно, конвейер начинается с пользователей клиентских приложений. У нас есть множество клиентов: Xbox One, iOS, наш веб-сайт, наш Flash-видеоплеер, различные порты Android и т. д. Поскольку все эти клиенты имеют зрелые и надежные HTTP-клиенты, мы используем HTTP для отправки событий на пограничные серверы конвейера статистики.

Эти клиенты знают, когда генерировать события, относящиеся к нашему продукту в целом. Наиболее распространенная статистика, которую регистрируют клиенты, — «минутное просмотрение»; каждый из наших видеоплееров выдает это за каждую минуту видео, просматриваемого зрителем, вместе с набором метаданных JSON, описывающих состояние клиента. Наиболее важной статистикой является «буфер пуст», который отправляется, когда видео заикается. Когда это происходит, мы знаем, что у кого-то было плохое впечатление от просмотра — наши видео- и сетевые команды принимают важные решения о масштабировании на этих событиях в рамках нашего непрекращающегося стремления к отличному глобальному качеству обслуживания видео.

Пограничный сервер

Наши клиенты отправляют объекты JSON в кодировке base64 на наш пограничный сервер, используя HTTP GET. Пограничный сервер состоит из экземпляра nginx и демона ведения журнала, написанного на Golang. Экземпляр nginx просто передает HTTP-запрос демону ведения журнала, который, в свою очередь, записывает HTTP-запрос в файл. Каждые 100 МБ зарегистрированных данных мы загружаем журнал в S3 и отправляем сообщение SQS с данными о файле в очередь, которую прослушивает процессор.

Пример пакета данных от клиента.

Эти пограничные серверы находятся за ELB и настроены как группа автоматического масштабирования. Мы увеличиваем и уменьшаем масштаб в зависимости от общего уровня трафика, поступающего в кластер. Нашей главной задачей является обеспечение того, чтобы мы не потеряли слишком много данных в случае выхода из строя какой-либо машины в AWS [1].

Демон ведения журналов, исходный код которого вскоре будет открыт, можно настроить на ротацию файлов журналов по двум критериям:

  1. время создания
  2. размер файла

Это позволяет нам свести к минимуму риск потери данных даже во время отлива, хотя при ротации 100 МБ мы никогда не делаем ротацию во время создания. Когда файл ротируется, он сигнализирует загрузчику в процессе загрузки файла в корзину S3 и публикует соответствующее сообщение SQS.

Процессор

Процессор представляет собой демон, написанный на Golang, который прослушивает очередь SQS, опубликованную с пограничного уровня.

Для каждого входящего файла процессор распаковывает данные и, предполагая, что они верны, извлекает из него данные и записывает их в целевой файл. Один файл для каждого типа событий, которые у нас есть. Этот процессор использует ту же библиотеку ведения журналов, что и пограничный сервер, однако он использует либо 5 часов, либо 1 ГБ данных. Процессор продолжит запись в тот же выходной файл до тех пор, пока он не будет убран, что еще раз инициирует загрузку S3 и сопутствующее сообщение SQS.

Целевой файл представляет собой сжатый gzip TSV [2] со строкой на каждый обработанный пакет. Последующие прогоны между ротациями будут записывать в тот же файл; gzip поддерживает эту форму добавления данных в файл (попробуйте объединить два gzip-файла в один, а затем zcatting!). Порядок столбцов в TSV имеет решающее значение, поскольку он должен соответствовать порядку столбцов целевой таблицы. Заказ сообщается сервером «схемы», который у нас есть. Это будет обсуждаться в разделе «Хранилище» этого поста.

Пример преобразования. Не великий :)

Координатор загрузки

Как и уровень обработки, этот уровень получает сообщения SQS и инициирует импорт в наш уровень хранения. Мы используем Redshift для хранения наших данных, внутри Redshift у вас есть многочисленные способы импорта данных; мы используем команду COPY, поскольку она может читать из S3 и поддерживает чтение в файлах, сжатых gzip. Наше решение хранить файлы в TSV связано с желанием быть совместимым с Redshift COPY.

Сам процесс загрузки — это область, в которой у нас есть возможности для значительного улучшения, в частности, Redshift поддерживает файлы манифеста, которые могут значительно повысить производительность импорта. В настоящее время нам требуется около 3 часов в день для приема 24-часовых данных, поскольку мы расширяемся, нам нужно улучшить эту скорость, опция файла манифеста позволяет каждому узлу нести ответственность за файл по сравнению с нашей текущей стратегией одного файла в время на команду COPY.

Пример

Место хранения

У нас есть кластер Redshift с четырьмя узлами dw1.xlarge, в котором у нас есть таблица для каждого типа события. В каждой таблице есть столбец для каждого свойства. Redshift позволяет вам указать ключ распределения, который используется во время импорта, чтобы гарантировать, что данные, к которым вы собираетесь присоединиться, хранятся на том же узле — это минимизирует работу узла-лидера и сетевой трафик, тем самым значительно повышая производительность соединения. В нашем случае мы распределяем по полупостоянному [3] идентификатору клиента и сортируем по отметке времени.

Наш сервер схемы координирует уровень процессора и уровень хранения. Поскольку в TSV нет строки заголовка, порядок записей имеет решающее значение и должен соответствовать целевой таблице. Уровень процессора кэширует версию текущей известной схемы на несколько минут и по мере обработки данных выдает TSV с данными в правильном порядке столбцов. Каждый раз, когда необходимо зарегистрировать новое событие, мы используем сервер схемы для выполнения необходимых команд CREATE TABLE в Redshift. Если к событию необходимо добавить новое свойство, сервер схемы выдает команду ALTER TABLE для добавления соответствующего столбца. Наш приемник использует модификатор FILLRECORD для команды COPY, который предотвращает нарушение импорта этими ALTER. Эта структура позволяет нам быстро регистрировать новые и более подробные данные, которые помогают нашим продуктовым командам быстро принимать решения. Наша краткосрочная цель - сделать сервер схемы действительно пуленепробиваемым, чтобы нам не нужно было быть частью этого потока.

Вывод

Этот дизайн позволяет нам достигать ~ 100 000 вставок в секунду. Предыдущие версии конвейера ограничивались гораздо меньшими возможностями. У нас есть амбициозные планы на будущее. В частности, создание потока ETL, который позволяет нам создавать более богатый набор данных, объединяя наши текущие данные о событиях в нечто, что рассказывает более целостную историю людей, использующих наш продукт. Простой пример интересующих нас данных: какие первоначальные рефереры превращаются в самых лояльных пользователей и как QoS влияет на эту лояльность; хотя это звучит просто, существует очень интересная задача агрегирования лояльности по каждому рефереру, а также вычисление начального реферера — берем ли мы весь набор данных и запускаем его через задание Map Reduce? Как мы это делаем? Какие промежуточные данные мы храним, чтобы этот запрос не стал мучительно длинным в недалеком будущем? Что произойдет, если мы загрузим данные из прошлого, которые могли не быть загружены из-за ошибки и которые изменяют первоначальный реферер для ряда пользователей?

Если вам интересно узнать больше или у вас есть четкое представление о том, где мы можем связать наши проблемы, пожалуйста, не стесняйтесь обращаться к нам, подробности можно найти на странице нашей команды. Кроме того, не стесняйтесь комментировать/задавать нам вопросы в ветке Hacker News.

[0] В идеале мы хотели бы, чтобы CLOUD_ENVIRONMENT была «интеграцией», но Асгард делает это сложным; они эффективно сопоставляют CLOUD_ENVIRONMENT с учетной записью AWS — в данном случае это означает, что для интеграции нам понадобится другая учетная запись AWS. Это не очень хорошо сочетается с консолью управления AWS. Использование CLOUD_DEV_PHASE здесь показалось разумным компромиссом.

[1] В настоящее время мы работаем только в одной зоне доступности AWS, в ближайшее время мы переместимся на передовые рубежи на восточном и западном побережье США, а также в ЕС и, возможно, в Сан-Паулу.

[2] Мы ищем альтернативы для этого. Redshift изначально понимает этот формат, но он неудобен при выполнении заданий по уменьшению карты, поскольку gzip не может быть легко разделен.

[3] На самом деле поразительно, насколько непостоянно постоянное хранилище. Например, некоторые консоли не позволяют приложениям сохранять данные, поэтому вместо этого вы должны вычислять данные согласованным образом. Конечно, есть также ситуация с UUID iOS.

Приложение. Следует отметить: Asgard и облачная конфигурация

В основе нашей работы лежит Asgard, который при полной настройке инициализирует пользовательские данные на ваших инстансах EC2 по мере их поступления. Мы используем отличную библиотеку goamz от CrowdMob для получения этих пользовательских данных и принятия решений во время выполнения о том, куда данные должны поступать. Например, на наших тестовых серверах интеграции установлен набор CLOUD_DEV_PHASE, который мы используем для переопределения CLOUD_ENVIRONMENT [0], который настраивает, какие очереди SQS и корзины S3 мы используем на каждом этапе конвейера.

Spade — это наше внутреннее имя конвейера. Это противоречит моей религии называть вещи своими именами!