Модель рекомендаций с использованием TensorFlow и TensorFlow Transform

Обновление за апрель 2020 г. Обратите внимание, что сейчас существует гораздо более простой способ сделать это. Прочтите эту статью о построении модели рекомендаций с использованием BigQuery ML.

В этой статье я расскажу, как использовать API-интерфейс TensorFlow Estimator для создания модели совместной фильтрации WALS для рекомендаций по продуктам. Недавно мой коллега Лукман Рэмси опубликовал серию решений, подробно описывающих, как построить модель рекомендаций - прочтите эти решения, чтобы узнать, что такое рекомендации и как настроить сквозную систему.

В этой статье я заменяю использование Pandas в исходном решении на Apache Beam - это упростит масштабирование решения до более крупных наборов данных. Поскольку в решении существует контекст, я просто углублюсь в технические детали здесь. Полный исходный код находится на GitHub.

Шаг 1. Извлеките необработанные данные

Для совместной фильтрации нам не нужно знать никаких атрибутов ни о пользователях, ни о контенте. По сути, все, что нам нужно знать, - это userId, itemId и рейтинг, который конкретный пользователь дал конкретному элементу. В этом случае мы можем использовать время, проведенное на странице, как прокси для оценки. Google Analytics 360 экспортирует информацию о веб-трафике в BigQuery, и именно из BigQuery я извлекаю данные:

#standardSQL
WITH visitor_page_content AS (

   SELECT  
     fullVisitorID,
     (SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS latestContentId,  
     (LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY hits.time ASC) - hits.time) AS session_duration 
   FROM `cloud-training-demos.GA360_test.ga_sessions_sample`,   
     UNNEST(hits) AS hits
   WHERE 
     # only include hits on pages
      hits.type = "PAGE"

   GROUP BY   
     fullVisitorId, latestContentId, hits.time
     )

# aggregate web stats
SELECT   
  fullVisitorID as visitorId,
  latestContentId as contentId,
  SUM(session_duration) AS session_duration 
 
FROM visitor_page_content
  WHERE latestContentId IS NOT NULL 
  GROUP BY fullVisitorID, latestContentId
  HAVING session_duration > 0
  ORDER BY latestContentId

Сам запрос зависит от способа, которым газета настроила Google Analytics - в частности, от того, как они настраивали специальные параметры - вам, возможно, придется использовать другой запрос для извлечения ваших данных во что-то похожее на эту таблицу:

Это необработанный набор данных, который необходим для совместной фильтрации. Очевидно, что то, что вы будете использовать для visitorId, contentId и рейтингов, будет зависеть от вашей проблемы. Помимо этого, все остальное довольно стандартно, и вы можете использовать его как есть.

Шаг 2. Создайте пронумерованные идентификаторы пользователей и элементов

Алгоритм WALS требует, чтобы идентификаторы пользователей и элементов были перечислены, то есть они должны быть просто номером строки и номером столбца в матрице взаимодействий. Итак, нам нужно взять visitorId выше, который является строкой, и сопоставить их с 0, 1, 2,…. То же самое нужно проделать и с идентификаторами предметов. Кроме того, рейтинг должен быть небольшим числом, обычно 0–1. Итак, нам нужно масштабировать session_duration.

Для этого сопоставления мы будем использовать TensorFlow Transform (TFT) - это библиотека, которая позволяет вам создавать предварительно обработанные наборы данных с использованием Apache Beam для обучения, а затем применять эту предварительную обработку как часть вашего графика TensorFlow во время вывода!

Вот суть моей функции предварительной обработки с использованием TFT:

def preprocess_tft(rowdict):
    median = 57937
    result = {
      'userId' : tft.string_to_int(rowdict['visitorId'], vocab_filename='vocab_users'),
      'itemId' : tft.string_to_int(rowdict['contentId'], vocab_filename='vocab_items'),
      'rating' : 0.3 * (1 + (rowdict['session_duration'] - median)/median)
    }
    # cap the rating at 1.0
    result['rating'] = tf.where(tf.less(result['rating'], tf.ones(tf.shape(result['rating']))),
                               result['rating'], tf.ones(tf.shape(result['rating'])))
    return result

Результатом предварительной обработки строки из BigQuery, состоящей из visitorId, contentId и session_duration, является словарь Python с именем result, который содержит три столбца: userId, itemId и rating.

tft.string_to_int просматривает весь обучающий набор данных и создает отображение для перечисления посетителей и записывает отображение («словарь») в файл vocab_users. Я делаю то же самое для contentId, создавая itemId. Рейтинг получается путем масштабирования session_duration в пределах 0–1. Мое масштабирование по сути отсекает длинный хвост чрезвычайно длительных сеансов, которые, вероятно, представляют людей, которые закрывают свои ноутбуки, когда читают газетную статью. Важно отметить, что я делаю такое отсечение, используя чистые функции TensorFlow, такие как tf.less и tf.ones. Это важно, потому что эта функция предварительной обработки должна применяться во время вывода (прогнозирования) как часть обслуживающего графа TensorFlow.

Функция предварительной обработки применяется к набору обучающих данных с помощью Apache Beam:

transformed_dataset, transform_fn = (
          raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))

Шаг 3. Запишите набор данных для обучения WALS

Обучающий набор для WALS состоит из двух файлов: один предоставляет все элементы, оцененные конкретным пользователем (матрица взаимодействия по строкам), а другой предоставляет всех пользователей, которые оценили конкретный элемент (матрица взаимодействия по столбцам). Очевидно, что эти два файла содержат одни и те же данные, но необходимо разделить набор данных, чтобы их можно было обрабатывать параллельно. Мы можем сделать это также в том же конвейере Apache Beam, где мы выполняли перечисление:

users_for_item = (transformed_data
    | 'map_items' >> beam.Map(lambda x : (x['itemId'], x))
    | 'group_items' >> beam.GroupByKey()
    | 'totfr_items' >> beam.Map(lambda item_userlist : to_tfrecord(item_userlist, 'userId')))

Затем мы можем запустить конвейер Apache Beam в Cloud Dataflow. Это полностью управляемый сервис, поэтому нам не придется возиться с настройкой инфраструктуры и установкой программного обеспечения (полный код см. В Блокноте в GitHub).

На данный момент у нас будут следующие файлы:

items_for_user-00000-of-00003
...
users_for_item-00000-of-00004
...
transform_fn/transform_fn/saved_model.pb
transform_fn/transform_fn/assets/
transform_fn/transform_fn/assets/vocab_items
transform_fn/transform_fn/assets/vocab_users
  1. `` users_for_item`` содержит всех пользователей / рейтинги для каждого элемента в формате TFExample. Элементы и пользователи здесь являются целыми числами (а не строками), то есть itemId, а не contentId, и userId, не visitorId. Рейтинг масштабируется.
  2. items_for_user содержит все элементы / рейтинги для каждого пользователя в формате TFExample. Элементы и пользователи здесь являются целыми числами (а не строками), то есть itemId, а не contentId, и userId, а не visitorId. Рейтинг масштабируется.
  3. `` vocab_items`` содержит отображение от contentId до перечисляемого itemId
  4. `` vocab_users`` содержит сопоставление visitorId с перечисленным userId
  5. save_model.pb содержит все преобразования тензорного потока, которые мы сделали во время предварительной обработки, так что их можно применять и во время прогнозирования.

Шаг 4: напишите код TensorFlow

В TensorFlow есть реализация WALS на основе API-интерфейса Estimator. Мы используем его так же, как и любой другой Оценщик - см. Функции read_dataset () и train_and_evaluate () в репозитории GitHub.

Более интересным является то, как мы используем обученный оценщик для пакетного прогнозирования. Для конкретного пользователя мы хотим найти наиболее популярные K элементов. Это можно сделать в TensorFlow, используя:

def find_top_k(user, item_factors, k):
  all_items = tf.matmul(tf.expand_dims(user, 0), tf.transpose(item_factors))
  topk = tf.nn.top_k(all_items, k=k)
  return tf.cast(topk.indices, dtype=tf.int64)

Пакетное прогнозирование включает вызов вышеуказанной функции для каждого пользователя, но при этом необходимо убедиться, что при записи вывода мы записываем строку visitorId, а не число userId (и аналогично для contentId / userId):

def batch_predict(args):
  import numpy as np
  
  # read vocabulary into Python list for quick index-ed lookup
  def create_lookup(filename):
      from tensorflow.python.lib.io import file_io
      dirname = os.path.join(args['input_path'], 'transform_fn/transform_fn/assets/')
      with file_io.FileIO(os.path.join(dirname, filename), mode='r') as ifp:
        return [x.rstrip() for x in ifp]
  originalItemIds = create_lookup('vocab_items')
  originalUserIds = create_lookup('vocab_users')
  
  with tf.Session() as sess:
    estimator = tf.contrib.factorization.WALSMatrixFactorization(
                         num_rows=args['nusers'], num_cols=args['nitems'],
                         embedding_dimension=args['n_embeds'],
                         model_dir=args['output_dir'])
           
    # but for in-vocab data, the row factors are already in the checkpoint
    user_factors = tf.convert_to_tensor(estimator.get_row_factors()[0]) # (nusers, nembeds)
    # in either case, we have to assume catalog doesn't change, so col_factors are read in
    item_factors = tf.convert_to_tensor(estimator.get_col_factors()[0])# (nitems, nembeds)
    
    # for each user, find the top K items
    topk = tf.squeeze(tf.map_fn(lambda user: find_top_k(user, item_factors, args['topk']), 
                                user_factors, dtype=tf.int64))
    with file_io.FileIO(os.path.join(args['output_dir'], 'batch_pred.txt'), mode='w') as f:
      for userId, best_items_for_user in enumerate(topk.eval()):
        f.write(originalUserIds[userId] + '\t') # write userId \t item1,item2,item3...
        f.write(','.join(originalItemIds[itemId] for itemId in best_items_for_user) + '\n')

Чтобы выполнить обучение и пакетное прогнозирование, мы можем запустить модель TensorFlow в Cloud ML Engine, опять же без возни с какой-либо инфраструктурой:

gcloud ml-engine jobs submit training $JOBNAME \
   --region=$REGION \
   --module-name=trainer.task \
   --package-path=${PWD}/wals_tft/trainer \
   --job-dir=$OUTDIR \
   --staging-bucket=gs://$BUCKET \
   --scale-tier=BASIC_GPU \
   --runtime-version=1.5 \
   -- \
   --output_dir=$OUTDIR \
   --input_path=gs://${BUCKET}/wals/preproc_tft \
   --num_epochs=10 --nitems=5668 --nusers=82802

Как-то уродливо жестко кодировать такие элементы и пользователей. Итак, мы можем вернуться к нашему конвейеру Beam и заставить его записывать nitems и nusers также в файлы, а затем просто выполнить «gsutil cat», чтобы получить соответствующие значения - это делает полный код на GitHub.

Вот пример того, как выглядит результат:

6167894456739729438	298997422,262707977,263058146
3795498541234027150	296993188,97034003,298989783

По сути, вы получаете 3 элемента для каждого посетителя.

Шаг 5. Коэффициенты строк и столбцов

Хотя составление рекомендаций по продуктам является ключевым вариантом использования WALS, другим вариантом использования является поиск низкоразмерных способов представления продуктов и пользователей, например, для сегментации продуктов или клиентов путем кластеризации факторов элементов и факторов столбцов. Итак, мы реализуем функцию обслуживания, чтобы предоставить их вызывающим абонентам (опять же, полный код см. На GitHub):

def for_user_embeddings(originalUserId):
      # convert the userId that the end-user provided to integer
      originalUserIds = tf.contrib.lookup.index_table_from_file(
          os.path.join(args['input_path'], 'transform_fn/transform_fn/assets/vocab_users'))
      userId = originalUserIds.lookup(originalUserId)
      
      # all items for this user (for user_embeddings)
      items = tf.range(args['nitems'], dtype=tf.int64)
      users = userId * tf.ones([args['nitems']], dtype=tf.int64)
      ratings = 0.1 * tf.ones_like(users, dtype=tf.float32)
      return items, users, ratings, tf.constant(True)

Оркестровка

Обратите внимание, что в этой статье речь идет только о замене частей обучения машинному обучению и пакетного прогнозирования исходного решения. В исходном решении также объясняется, как выполнять оркестровку и фильтрацию. Где они вписываются?

На данный момент у нас есть запрос BigQuery, конвейер Beam / Dataflow и, возможно, приложение AppEngine (см. Ниже). Как вы запускаете их периодически один за другим? Используйте Apache Airflow, как предложено в решении для этой оркестровки.

Фильтрация

Если вы рекомендуете шоколадные конфеты клиентам, то можно рекомендовать шоколад, который они уже пробовали, но если вы рекомендуете пользователям газетные статьи, то важно воздерживаться от рекомендаций статей, которые они уже прочитали.

Мой код пакетного прогнозирования, в отличие от исходного решения, не отфильтровывает статьи, которые пользователь уже прочитал. Если важно, чтобы рекомендации не включали уже прочитанные / приобретенные элементы, то это можно сделать двумя способами.

Более простой метод - обнулить записи, соответствующие уже прочитанным элементам (здесь элементы с рейтингом ‹0,01), перед нахождением top_k:

def find_top_k(user, item_factors, read_items, k):
  all_items = tf.matmul(tf.expand_dims(user, 0), 
                        tf.transpose(item_factors))
  all_items = tf.where(tf.less(read_items, 
                               0.01*tf.ones(tf.shape(read_items))),
                       all_items,
                       tf.zeros(tf.shape(all_items)))
  topk = tf.nn.top_k(all_items, k=k)
  return tf.cast(topk.indices, dtype=tf.int64)

Проблема заключается в задержке - вы можете не рекомендовать элементы, которые пользователь прочитал вчера (потому что они есть в вашем наборе обучающих данных), но код пакетного прогнозирования имеет доступ к потоку прочитанных статей в режиме реального времени, поэтому вы будете рекомендовать статьи, которые они прочитали несколько минут назад.

Если это отставание - проблема, которую вы хотите избежать, вам следует сделать k в прогнозировании партии намного выше (так, чтобы, например, вы получали 20 статей из рекомендателя, даже если вы собираетесь рекомендовать только 5 из них. ), а затем выполните второй уровень фильтрации в AppEngine, как предложено в исходном решении.

Резюме

Теперь у вас есть пакетное прогнозирование, онлайн-прогнозирование и обучение - и все это без настройки какого-либо кластера! Кроме того, TensorFlow Transform позволил нам упростить вычисление метаданных и сопоставление элементов / пользователей в соответствии с парадигмой WALS.

Спасибо моим коллегам Лукману Рэмси и Илянгу Чжао за полезные комментарии и предложения по этой статье.