Вторая из трех частей серии, в которой мы создаем функции прогнозирования и передаем данные в модель контролируемого машинного обучения.

Введение

Это вторая часть серии из трех частей, в которой я описываю подход к прогнозированию оттока пользователей с помощью pyspark. Найдите первую часть серии здесь.

В этой статье мы используем знания, полученные на этапе исследования, для создания некоторых функций прогнозирования и передачи данных в модель контролируемого машинного обучения.

Инженерные особенности

Прежде чем мы сможем разработать некоторые функции, было бы неплохо получить представление о свойствах пользователей, которые ушли, по сравнению с теми, кто этого не сделал. Ниже я разделяю набор данных и показываю таблицу с общей статистикой для каждого:

def print_relevant_stats(df, total_rows, total_users):
    """"Print high level statistics for the dataframe"""
    nrows = df.count()
    nusers = df.select('userId').dropDuplicates().count()
    npaid = df.groupby('userId', 'level').count().where(
        'level == "paid"').count()
    print(f'Proportion of the total rows in this group: '
          f'{nrows / total_rows:.2f}')
    print(f'Proportion of the total users in this group: '
          f'{nusers / total_users:.2f}')
    print(f'Proportion of users that paid for the service '
          f'at some point: {npaid / nusers:.2f}')
    print(f'Proportion of users that have never paid for the '
          f'service: {(nusers - npaid) / nusers:.2f}')
    
    # The average amount of songs played can be found by grabbing 
    # the rows with a song defined and then just grouping by userId   
    # to get counts for the calculation
    avg_agg = df.where('song is not null') \
        .groupby('userId') \
        .count() \
        .select(
            sqlF.mean(sqlF.col('count')).alias('avg_songs_played')
        ).head(1)[0]
    print(f'Avg amount of songs played per user:'       
          f'{avg_agg.avg_songs_played:.2f}')
    
    # The average amount of sessions can be calculated from getting 
    # unique duplicate pairs of user and session ids and then 
    # grouping by user to get the counts for the average calculation
    avg_agg = df.select('userId', 'sessionId') \
        .dropDuplicates() \
        .groupby('userId') \
        .count() \
        .select(
            sqlF.mean(sqlF.col('count')).alias('avg_sessions')
        ).head(1)[0]
    print(f'Avg number of sessions per user:' 
          f'{avg_agg.avg_sessions:.2f}')
    
    # Getting the average actions and lengths per session requires 
    # us to find the particular value for each one, which we can get  
    # from the max in each group. Then we can proceed to get avg per 
    # user, and then a final average across all users
    current_window = Window.partitionBy(
        'userId', 'sessionId').orderBy('ts')
    avg_agg = df.withColumn(
            'sessionLen', 
            (sqlF.col('ts') - sqlF.first(sqlF.col('ts')) \
                                  .over(current_window)) / 1000.0
        ).groupby('userId', 'sessionId') \
        .max() \
        .groupby('userId') \
        .avg() \
        .select(
            sqlF.mean(
                sqlF.col('avg(max(itemInSession))'
            )).alias('avg_user_actions'),                
            sqlF.mean(
                sqlF.col('avg(max(sessionLen))'
            )).alias('avg_session_len'),
        ).head(1)[0]
    print(f'Avg number of actions per session:' 
          f'{avg_agg.avg_user_actions:.2f}')
    print(f'Avg duration of sessions (in hours):'
          f'{avg_agg.avg_session_len / (60 * 60):.2f}')
    
    # Getting the average amount of seconds since the first show of 
    # users is similar to the above, but this time we don't group by   
    # sessionId
    current_window = Window.partitionBy('userId').orderBy('ts')
    avg_agg = df.withColumn('secondsSinceFirstShow', 
        (sqlF.col('ts') -   sqlF.first(sqlF.col('ts')) \
                                .over(current_window)) / 1000.0
        ).groupby('userId') \
        .max() \
        .select(
            sqlF.mean(sqlF.col('max(secondsSinceFirstShow)')) \
                          .alias('avg_first_show'),
        ).head(1)[0]
    print(f'Avg number of hours since first show:'
          f'{avg_agg.avg_first_show / (60 * 60):.2f}')
    
    # Calculate not only page visit counts but also percentage of 
    # the total for each
    pages_agg = df.groupby('page').count()
    npages = pages_agg.select(
        sqlF.sum(sqlF.col('count')).alias('npages'),
    ).head(1)[0].npages
    per_page_agg = pages_agg.sort(sqlF.desc('count')) \
        .withColumn('proportion', sqlF.col('count') / npages)          
    print('Stats per page:')
    per_page_agg.show()
# Get some counts for next steps
total_rows = df.count()
total_users = df.select('userID').dropDuplicates().count()
print(f'Number of rows and users in dataset: '
      f'{total_rows}, {total_users}')
Number of rows and users in dataset: 286500, 226
print('Stats for set of non-churned users:')
print_relevant_stats(df.where('churned=0'), total_rows, total_users)
Stats for set of non-churned users:
Proportion of the total rows in this group: 0.84
Proportion of the total users in this group: 0.77
Proportion of users that paid for the service at some point: 0.75
Proportion of users that have never paid for the service: 0.25
Avg amount of songs played per user: 1108.17
Avg number of sessions per user: 24.53
Avg number of actions per session: 88.50
Avg duration of sessions (in hours): 5.48
Avg number of hours since first show: 1129.71
Stats per page:
+-------------------+------+--------------------+
|               page| count|          proportion|
+-------------------+------+--------------------+
|           NextSong|191714|  0.7933999900677051|
|               Home| 12785|0.052910162393020904|
|          Thumbs Up| 10692| 0.04424837358671721|
|    Add to Playlist|  5488|0.022711847572381597|
|         Add Friend|  3641|0.015068118988892383|
|              Login|  3241|0.013412736512771277|
|        Roll Advert|  2966|0.012274661060438015|
|             Logout|  2673|0.011062093396679303|
|        Thumbs Down|  2050|0.008483835190120678|
|          Downgrade|  1718|0.007109867734940158|
|               Help|  1487|0.006153884354980218|
|           Settings|  1244|0.005148239500736...|
|              About|   868|0.003592179973182804|
|            Upgrade|   387|0.001601582545647...|
|      Save Settings|   252|0.001042890959956298|
|              Error|   226|9.352910990084259E-4|
|     Submit Upgrade|   127|5.255839361684517E-4|
|   Submit Downgrade|    54|2.234766342763495...|
|           Register|    18|7.449221142544985E-5|
|Submit Registration|     5|2.069228095151385E-5|
+-------------------+------+--------------------+
print('Stats for set of churned users:')
print_relevant_stats(df.where('churned=1'), total_rows, total_users)
Stats for set of churned users:
Proportion of the total rows in this group: 0.16
Proportion of the total users in this group: 0.23
Proportion of users that paid for the service at some point: 0.69
Proportion of users that have never paid for the service: 0.31
Avg amount of songs played per user: 699.88
Avg number of sessions per user: 10.33
Avg number of actions per session: 78.94
Avg duration of sessions (in hours): 4.33
Avg number of hours since first show: 564.35
Stats per page:
+--------------------+-----+--------------------+
|                page|count|          proportion|
+--------------------+-----+--------------------+
|            NextSong|36394|  0.8112072039942939|
|           Thumbs Up| 1859| 0.04143634094151213|
|                Home| 1672|0.037268188302425106|
|     Add to Playlist| 1038|0.023136590584878745|
|         Roll Advert|  967|0.021554029957203995|
|          Add Friend|  636|0.014176176890156919|
|              Logout|  553| 0.01232614122681883|
|         Thumbs Down|  496|0.011055634807417974|
|           Downgrade|  337|0.007511590584878745|
|            Settings|  270|0.006018188302425107|
|                Help|  239|0.005327211126961484|
|             Upgrade|  112|0.002496433666191...|
|       Save Settings|   58|0.001292796005706134|
|               About|   56|0.001248216833095...|
|              Cancel|   52|0.001159058487874...|
|Cancellation Conf...|   52|0.001159058487874...|
|               Error|   32|7.132667617689016E-4|
|      Submit Upgrade|   32|7.132667617689016E-4|
|    Submit Downgrade|    9|2.006062767475035...|
+--------------------+-----+--------------------+

Результаты вышеизложенного довольно информативны! На высоком уровне похоже, что чем больше времени пользователь проводит на платформе, тем меньше вероятность того, что пользователь уйдет в ближайшем будущем.

Основываясь на вышеизложенном, я подумал, что хороший набор пользовательских функций для разработки будет следующим:

  • number_sessions: Общее количество сеансов
  • seconds_since_genesis: Общее количество секунд с момента первого появления
  • avg_actions_per_session: Среднее количество действий за сеанс
  • avg_seconds_per_session: Среднее количество секунд, затраченных за сеанс.

В настоящее время у каждого пользователя может быть несколько строк, поскольку они представляют собой отдельные действия в определенные моменты времени. Но что нам действительно нужно, так это иметь одну строку для каждого пользователя с упомянутыми функциями в дополнение к метке churned, которую мы хотим предсказать (мы уже добавили этот столбец в предыдущей статье).

В следующих подразделах я сосредоточусь на создании правильной функции для создания каждой функции для каждого пользователя, а затем, в конце, соберу все части воедино.

Общее количество сеансов

Это легко, нам просто нужно получить разные пары идентификаторов пользователей и сеансов, а затем сгруппировать по идентификаторам пользователей, чтобы мы могли извлечь нужные нам счетчики:

def add_feature_number_sessions(df):
    """Add `number_sessions`: amount of sessions per user"""
    counts_df = df.select('userId', 'sessionId') \
        .dropDuplicates() \
        .groupby('userId') \
        .count() \
        .withColumnRenamed('count', 'number_sessions')
    return df.join(counts_df, ['userId'])

Общее количество секунд с момента первого появления

Мы можем получить общее количество секунд с момента первого появления пользователя с помощью оконной функции, которая вычисляет дельту текущей метки времени до первой наблюдаемой при группировке по пользователю. Оттуда мы просто берем максимальное значение, полученное для каждого раздела, как окончательное значение:

def add_feature_seconds_since_genesis(df):
    """Add `seconds_since_genesis`: seconds since first appearance"""
    current_window = Window.partitionBy('userId').orderBy('ts')
    genesis_df = df.withColumn(
        'seconds_since_genesis', 
        (sqlF.col('ts') - sqlF.first(sqlF.col('ts'))
                              .over(current_window)) / 1000.0)
    genesis_df = genesis_df.groupby('userId') \
        .max() \
        .withColumnRenamed(
            'max(seconds_since_genesis)', 'seconds_since_genesis'
        ).select('userId', 'seconds_since_genesis')
    return df.join(genesis_df, ['userId'])

Среднее количество действий за сеанс

Чтобы рассчитать среднее количество действий сеанса на пользователя, нам нужно сначала сгруппировать как по пользователю, так и по идентификатору сеанса. Оттуда в столбце max itemInSession будет указано общее количество действий за сеанс. Наконец, мы можем взять среднее из них и получить желаемое значение:

def add_feature_avg_actions_per_session(df):
    """Add `avg_actions_per_session`: average actions per session"""
    current_window = Window.partitionBy('userId').orderBy('ts')
    avg_df = df.groupby('userId', 'sessionId') \
        .max() \
        .groupby('userId') \
        .avg() \
        .withColumnRenamed(
            'avg(max(itemInSession))', 'avg_actions_per_session'
        ).select('userId', 'avg_actions_per_session')
    return df.join(avg_df, ['userId'])

Среднее количество секунд, потраченных на сеанс

Чтобы рассчитать среднюю продолжительность сеанса на пользователя, мы делаем то же самое, что и в предыдущем случае, нам просто нужно построить промежуточный столбец в процессе, чтобы мы могли рассчитать продолжительность текущего сеанса:

def add_feature_avg_seconds_per_session(df):
    """Add `avg_seconds_per_session`: average session duration"""
    current_window = Window.partitionBy(
        'userId', 'sessionId').orderBy('ts')
    avg_df = df.withColumn(
            'sessionLen', 
            (sqlF.col('ts') - sqlF.first(sqlF.col('ts')) \
                                  .over(current_window)) / 1000.0
        ).groupby('userId', 'sessionId') \
        .max() \
        .groupby('userId') \
        .avg() \
        .withColumnRenamed(
            'avg(max(sessionLen))', 'avg_seconds_per_session'
        ).select('userId', 'avg_seconds_per_session')
    return df.join(avg_df, ['userId'])

Собираем все вместе

Теперь у нас есть все необходимое для создания функции, которая загружает набор данных, очищает его, добавляет функции и метки и сводит его к форме, в которой для каждого пользователя сохраняется одна строка только с функциями и прогнозирующей меткой:

def load_df_for_ml(json_filepath):
    """Load json, then cleans/transform it for modeling"""
    df = spark.read.json(json_filepath)
    df_clean_v1 = df.filter('userId != ""')
    df_with_features = add_feature_number_sessions(df_clean_v1)
    df_with_features = add_feature_seconds_since_genesis(
        df_with_features)                   
    df_with_features = add_feature_avg_actions_per_session(
        df_with_features)                   
    df_with_features = add_feature_avg_seconds_per_session(
        df_with_features)                   
    df_with_features = add_label_churned(
        df_with_features)
    
    features = [
        'number_sessions', 'seconds_since_genesis', 
        'avg_actions_per_session', 'avg_seconds_per_session',
    ]
    return df_with_features.select(
        ['userId', 'churned'] + features).dropDuplicates()
file_path = './mini_sparkify_event_data.json'
final_df = load_df_for_ml(file_path)
final_df.select(
    'userId', 'churned', 'number_sessions').sort('userId').show()
------+-------+---------------+
|userId|churned|number_sessions|
+------+-------+---------------+
|    10|      0|              6|
|   100|      0|             35|
|100001|      1|              4|
|100002|      0|              4|
|100003|      1|              2|
|100004|      0|             21|
|100005|      1|              5|
|100006|      1|              1|
|100007|      1|              9|
|100008|      0|              6|
|100009|      1|             10|
|100010|      0|              7|
|100011|      1|              1|
|100012|      1|              7|
|100013|      1|             14|
|100014|      1|              6|
|100015|      1|             12|
|100016|      0|              8|
|100017|      1|              1|
|100018|      0|             21|
+------+-------+---------------+
only showing top 20 rows

Модель здания

Наконец-то мы готовы запустить контролируемую модель машинного обучения для набора данных. Я думаю, что мы должны стремиться к модели с максимально возможной полнотой, учитывая, насколько мал отточенный набор (только 16% строк исходит от отточенных пользователей), а также потому, что я предполагаю, что ложный отрицательный результат , когда на самом деле они будут) более разрушительны, чем ложное срабатывание (прогнозирование того, что кто-то уйдет, когда на самом деле они этого не планировали).

Давайте настроим конвейер LogisticRegression:

def get_ml_pipeline(clf):
    """Constructs a pipeline to transform data before running clf"""
    features = [
        'number_sessions', 'seconds_since_genesis',
        'avg_actions_per_session', 'avg_seconds_per_session',
    ]
    assembler = VectorAssembler(
        inputCols=features, outputCol="features")
    return Pipeline(stages=[assembler, clf])
def eval_model(model, validation_df):
    """Runs a model against test set and prints performance stats"""
    results = model.transform(validation_df)    
    predictionAndLabels = results.rdd.map(
        lambda row: (float(row.prediction), float(row.label)))
    metrics = MulticlassMetrics(predictionAndLabels)
    print('Performance Stats')
    print(f'Accuracy: {metrics.accuracy:.4f}')
    print(f'Precision = {metrics.precision(1.0):.4f}')
    print(f'Recall = {metrics.recall(1.0):.4f}')
    print(f'F1 Score = {metrics.fMeasure(1.0):.4f}')
# Grab a random 80% of the dataset for the train set and the rest 
# for validation
train_df, validation_df = final_df.withColumnRenamed(
    'churned', 'label').randomSplit([0.8, 0.2], seed=42)
# Fit pipeline to the training dataset
pipeline = get_ml_pipeline(LogisticRegression(standardization=True))
model = pipeline.fit(train_df)
# Run model against the validation dataset and print performance 
# statistics
eval_model(model, validation_df)
Performance Stats
Accuracy: 0.7941
Precision = 0.6667
Recall = 0.2500
F1 Score = 0.3636

Мы хорошо справились с точностью и аккуратностью, но не так хорошо с запоминанием (что привело к низкому баллу F1). Попробуем уточнить нашу модель.

Уточнение модели

Чтобы уточнить нашу модель, мы должны изучить возможные комбинации значений для гиперпараметров классификатора LogisticRegression. PySpark предоставляет CrossValidator, который оценивает каждую комбинацию параметров с помощью k-кратной перекрестной проверки.

Я сосредоточусь на попытках комбинаций для следующих параметров LogisticRegression:

  • regParam: параметр регуляризации модели (›= 0). (по умолчанию: 0,0)
  • aggregationDepth: предлагаемая глубина для treeAggregate (больше или равна 2). Если размеры объектов или количество разделов велико, этот параметр можно настроить на больший размер. (по умолчанию: 2.0)
  • elasticNetParam: параметр микширования ElasticNet. Для 0 вводится штраф L2. Для 1 это штраф L1. (по умолчанию: 0,0)
  • maxIter: максимальное количество итераций подбора. (по умолчанию: 100,0)
def build_cross_validator(numFolds=3):
    """Build CrossValidator for tuning a LogisticRegression model"""
    lr = LogisticRegression(standardization=True)
    pipeline = get_ml_pipeline(lr)
    paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.0, 0.5]) \
        .addGrid(lr.aggregationDepth, [2, 4]) \
        .addGrid(lr.elasticNetParam, [0.0, 1.0]) \
        .addGrid(lr.maxIter, [10, 100]) \
        .build()
    evaluator = MulticlassClassificationEvaluator()    
    return CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=numFolds)
# Tune the mode using a k-fold cross-validation based approach to 
# try to find better hyperparameters 
cv = build_cross_validator()
cv_model = cv.fit(train_df)
# Evaluate the performance of the cross-validated model
eval_model(cv_model, validation_df)
Performance Stats
Accuracy: 0.7941
Precision = 0.6667
Recall = 0.2500
F1 Score = 0.3636

На самом деле это не улучшило ситуацию, поэтому значения по умолчанию уже хорошо работают с текущим разделением поезд-тест. В среднем наборе данных всего 225 различных пользователей, поэтому я уверен, что часть проблемы заключается в размере выборки. Мы могли бы попробовать другие методы выборки (поскольку взбалтываемый набор невелик), или, возможно, другие модели машинного обучения или разработку дополнительных функций, но я думаю, что 25% отзыва достаточно, чтобы перейти к более крупному набору данных.

В следующей статье…

Теперь, когда у нас есть рабочий конвейер прогнозирования, в следующей (и последней) статье серии я расскажу вам, как настроить кластер AWS EMR для обучения и оценки нашей модели с набором данных объемом 12 ГБ. Затем я поделюсь своими мыслями о результатах и ​​предложу возможные дальнейшие шаги.

Посетите мой репозиторий на github, если вас интересует фактический код, который можно использовать для воспроизведения результатов всей работы.