В различных областях нам нужна эффективная система оповещения и мониторинга для выявления потенциальных аномалий, мошенничества или экстремальных значений.
В этом уроке мы объясним вам, как создать хорошую систему оповещения в реальном времени.

Весь скрипт доступен там: https://github.com/Faouzizi/Alerting-System

И. Введение

Когда вы работаете с временными рядами, такими как ряды продаж, количество транзакций, сгенерированный CA или что-то в этом роде, вы часто сталкиваетесь с очень неприятной проблемой: необнаруженной аномалией.

Итак, в этом коротком уроке я покажу вам простой способ создать и настроить собственную систему оповещения.

Все наши скрипты находятся там: https://github.com/Faouzizi/Alerting-System

II. Смоделируйте временной ряд

На первом этапе нам нужно импортировать данные с сервера MySQL или другого источника данных:

####################################################################
###########               Import python packages
####################################################################
import pandas as pd
import mysql.connector as MySQLdb
import config

####################################################################
###########               Import python packages
####################################################################
def import_data(req, db_name):
    try:
        conn = MySQLdb.connect(user=config.mysql_user, host=config.mysql_host, password=config.mysql_password, db=config.mysql_db)
        print('Connection ok')
    except:
        print("I am unable to connect to the database")
    try:
        cur = conn.cursor()
        cur.execute(req)
        results = cur.fetchall()
    finally:
        conn.close()
    data = pd.DataFrame(list(results), columns=[row[0] for row in cur.description]).reset_index(drop=True)
    return (data)

После этого нам нужно обработать импортированный временной ряд и смоделировать его.
Затем мы можем предсказать следующее значение временного ряда с 95% IC, если реальное значение не находится в этом IC, поэтому обнаруживается аномалия, и мы отправляем предупреждающее электронное письмо, иначе ничего не делаем.

а. Обработка временных рядов

Первым шагом нашего моделирования является обработка нашего временного ряда путем обработки экстремальных значений.
Поэтому мы определим экстремальные значения и заменим их более согласованными значениями. В нашем случае мы собираемся заменить их значением, которое является средним значением предыдущего и следующего значений (вы можете использовать более сложную модель, если хотите).

Для определения экстремальных значений будем использовать квантили: пусть Q1 и Q3 будут соответственно первым и третьим квартилями, тогда мы можем определить экстремальное значение как любое значение, находящееся вне интервала [Q1 — k * (Q3-Q1); Q1 + k * (Q3-Q1)], где k — положительное целое число.

def traiter_valeurs_extremes_continues(df, variable):
    ###############################################################
    # This function allow you to treat extreme values by replacing 
    # them by mean
    ###############################################################
    q1 = df[variable].quantile([0.25]).values[0]
    q3 = df[variable].quantile([0.75]).values[0]
    IC_valeur_non_aberantes = [q1 - 2*(q3-q1), q3 + 2*(q3-q1)]
    df.loc[df[variable]<IC_valeur_non_aberantes[0], variable] = 
                                               df[variable].mean()
    df.loc[df[variable]>IC_valeur_non_aberantes[1], variable] = 
                                               df[variable].mean()
    return(df[variable])

После этого наш временной ряд готов к моделированию:

б. Создайте модель

В этом разделе мы будем использовать пакет Prophet, чтобы подогнать нашу модель и использовать его для прогнозирования следующего значения (вы можете использовать любую другую модель ML).

Шаг 1.
На этом шаге мы построим нашу модель с помощью Prophet, поэтому начнем с переименования столбцов следующим образом:

# we rename variables for Prophet package
df_temp.rename(columns={'sellTime':'ds','CA':'y'}, inplace=True)

Затем мы строим нашу модель:

model_fb = Prophet()

Шаг 2.
На этом шаге мы подгоним нашу модель без последнего значения. Мы не используем последнее значение, потому что хотим знать, является ли оно экстремальным значением (аномалией) или нет.

# an extreme value or not
    model_fb.fit(df_temp[['ds','y']][:-1])

Шаг 3.
Предскажите значение последнего значения и определите доверительный интервал:

# we predict the next value using our model
future = model_fb.make_future_dataframe(periods=1, freq='5min')
forecast = model_fb.predict(future)
# there we have our confidence intervall 
df = pd.merge(df,forecast[['ds','yhat','yhat_lower', 'yhat_upper']], how='inner', left_on='sellTime', right_on='ds')

Здесь мы видим компоненты прогноза по методу Prophet.plot_components:

fig2 = model_fb.plot_components(forecast)

Шаг 4.
Для последнего значения проверьте, находится ли реальное значение в доверительном интервале или нет. Если реальное значение выходит за этот интервал, мы обнаруживаем аномалию и должны отправить предупреждение.
У нас может быть 2 типа аномалии: большой объем или низкий объем.

# we create 2 new variables
df['alert_high'] = 0
df['alert_low'] = 0
# dectect the anomaly if the last value is out of IC
df.loc[df.shape[0]-1,'alert_high'] = 1 if df['CA'].iloc[-1] >    
                                    df['yhat_upper'].iloc[-1] else 0
df.loc[df.shape[0]-1,'alert_low'] = 1 if df['CA'].iloc[-1]< 
                                   df['yhat_lower'].iloc[-1] else 0

Шаг 5.
Если мы обнаружим аномалию, последним шагом будет информирование ответственной команды по электронной почте.

в. Отправить электронное письмо

Цель этого раздела — объяснить, как мы можем отправить электронное письмо ответственной группе с графиками временных рядов при обнаружении аномалии.
Графики покажут важность аномалии.

Шаг 1. Постройте временные ряды и сохраните их

#############################################################################
###########               Import python packages
#############################################################################
import matplotlib.pyplot as plt
from matplotlib.ticker import (MultipleLocator, FormatStrFormatter, AutoMinorLocator)
import matplotlib.dates as mdates
import matplotlib.ticker as ticker

#############################################################################
###########               Plot time series using last 24 or 48 hour
#############################################################################
def plot_figure(nb_hour, df):
    if nb_hour==24:
        df_24 = df[-nb_hour*12:]
        fig, ax = plt.subplots()
        ax.xaxis.grid(True, which='minor')
        ax.plot(df_24.index, df_24)
        ax.set_xlabel('Time')
        ax.set_ylabel('CA')
        ax.xaxis.set_major_locator(MultipleLocator(0.5))
        ax.xaxis.set_major_formatter(mdates.DateFormatter('\n %d-%m'))
        ax.xaxis.set_minor_locator(MultipleLocator(0.07))
        ax.xaxis.set_minor_formatter(mdates.DateFormatter('%Hh'))
        plt.tight_layout()
        plt.savefig('./plots/plot_24h.png')
    else:
        df_48 = df[-nb_hour*12:]
        fig, ax = plt.subplots()
        ax.xaxis.grid(True, which='minor')
        ax.plot(df_48.index, df_48)
        ax.set_xlabel('Time')
        ax.set_ylabel('CA')
        ax.xaxis.set_major_locator(MultipleLocator(1))
        ax.xaxis.set_major_formatter(mdates.DateFormatter('\n %d-%m'))
        ax.xaxis.set_minor_locator(MultipleLocator(0.17))
        ax.xaxis.set_minor_formatter(mdates.DateFormatter('%Hh'))
        plt.tight_layout()
        plt.savefig('./plots/plot_48h.png')

Шаг 2. Создайте HTML-шаблон, включающий содержимое электронной почты
Там мы адаптируем шаблон в соответствии с типом аномалии. Если у нас низкий объем, заголовок будет «Низкий объем продаж», а цвет будет оранжевым, а если аномалия — большой объем, заголовок будет «Высокий объем продаж», а цвет будет синим.

######################################################################### Import python packages ####################################################################
from string import Template
import base64 
######################################################################### Build the email content using html
####################################################################def get_html_template(type_alert):
    
    # Import of time series plots
    data24h_uri = base64.b64encode(open('./plots/plot_24h.png', 
                                   'rb').read()).decode('utf-8')
    data48h_uri = base64.b64encode(open('./plots/plot_48h.png', 
                                   'rb').read()).decode('utf-8')
    img24h_tag = '<img src="data:image/png;base64,
                                         {0}">'.format(data24h_uri)
    img48h_tag = '<img src="data:image/png;base64,{0}"  
                                height="auto"> '.format(data48h_uri)

    # Choice of the email color, orange for low sales and blue for 
    # high sales
    if type_alert=='Low':
        color = '#F7F8E0'
    else:
        color = '#33E3FF'

    # Html email contents
    message = """
    <!DOCTYPE html>
      <html>
    
        <head>
          <title>Volume Alert</title>
        </head>
    
    
    
        <body  style="background-color: #F7F8E0">
    
    
          <header style="background-color: """+color+"""">
            <h1 >"""+type_alert+""" Sales Volume</h1>
          </header>
    
          <div style="background-color: """+color+""";">
            <br>
            <div style="position: relative; top: 20px; left: 30px;">
              """+img24h_tag+"""
            </div>
            <h2>Last 24 hours</h2>
          </div>
    
    
          <div style="background-color: """+color+""";">
            <br>
            <div style="position: relative; top: 20px; left: 30px;">
              """+img48h_tag+"""
            </div>
            <h2>Last 48 hours</h2>
          </div>
    
    
    
          <footer style="background-color: gray;">
            <p style="text-align: center;">Contact us on : <a href="mailto:[email protected]">[email protected]</a></p>
          </footer
        </body>
    
    
    
      </html>
    """
    return(Template(message))

Шаг 3. Отправьте электронное письмо

####################################################################
###########               Import python packages
####################################################################
import smtplib
import config
from utils.get_templates import get_html_template
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

####################################################################
###########               Send the email
####################################################################
def send_alerting_email(alert_message,type_alert):
    # get the email content created on step 2
    message_template = get_html_template(type_alert)
    #connect to the SMTP server
    s = smtplib.SMTP(host='smtp.gmail.com', port=587)
    s.starttls()
    s.login(config.smtp_email, config.smtp_password)
    # Send the email for each email on the recipient list
    for email in config.recipient_list:
        msg = MIMEMultipart() # create a message
        # add in the actual person name to the message template
        message = message_template.substitute()
        # setup the parameters of the message
        msg['From']=config.smtp_email
        msg['To']=email
        msg['Subject'] = alert_message
        # add in the message body
        msg.attach(MIMEText(message, 'html'))
        # send the message via the server set up earlier.
        s.send_message(msg)
        del msg
    # Terminate the SMTP session and close the connection
    s.quit()
    return('email sent :)')

Чуть ниже у вас есть пример электронного письма, полученного во время аномалии большого объема. Мы получим такое же электронное письмо, когда будет обнаружена аномалия с низким объемом.

III. Мизан продакшн

Последней частью этого проекта является автоматизация задания, чтобы оно выполнялось каждую минуту.
Для этого мы будем использовать два подхода: первый — автоматизировать с помощью crons, а второй — с помощью Rundeck.

  1. Метод №1: Cron
    Итак, здесь мы будем использовать crontab для автоматизации нашей работы. Единственное задание, которое мы запустим, это main.py (перейдите на github, чтобы увидеть этот файл python: https://github.com/Faouzizi/Alerting-System)
    На вашем сервере или в терминале выполните следующие командные строки. :
- crontab -l # this command line will show you all running crons
- crontab -e # this command line will let you edit crons
# past this command line to run your main file every mintues- * * * * * env mysql_user='xxxxx' mysql_password='xxxxx' mysql_host='xxxxx' smtp_password='xxxxx' mysql_db='xxxxx' smtp_email='xxxxx' recipient_lis='xxxxx' python3 main.py

Если вам нужна дополнительная информация о cron, перейдите сюда: https://medium.com/@elfao/scheduling-jobs-using-crontab-844c7e135d97

Скрипты там: https://github.com/Faouzizi/Alerting-System

Не забудьте поставить лайк и прокомментировать эту статью, если у вас есть какие-либо вопросы, я отвечу на них.

2. Метод №2: Рандек

Если вы предпочитаете, вы можете использовать Rundeck для автоматизации задания, которое будет запускаться каждую минуту.
Я объясню, как использовать Rundeck, в следующей статье.

Заключение:
На этом мы заканчиваем нашу статью о системе оповещения в реальном времени. Теперь вы можете создать свой собственный.
Если у вас есть вопросы, не стесняйтесь задавать их мне 😄

Вы новичок в Medium?
Не стесняйтесь подписаться менее чем за 5 долларов США здесь, чтобы получать неограниченные выгоды и улучшать свои навыки.