Создайте алгоритм кластеризации, используя Dynamic Time Warping (DTW) и данные кривой дневной доходности, чтобы прогнозировать цикл рецессии в США.

Автор: Антон Гордон

Введение

В этой статье мы создадим алгоритм кластеризации, используя Dynamic Time Warping (DTW) и данные дневной кривой доходности, чтобы предсказать цикл рецессии в США. Мы будем использовать алгоритм усиленного дерева для захвата эффектов взаимодействия и разработки модели прогнозирования. Приложение будет развернуто с использованием Flask, AWS Elastic Container Registry (ECR) и Elastic Container Service (ECS) с определением задачи Fargate. Мы включим элементы из статьи Таснима Райхана «Предсказание рецессии в США: упражнение по динамическому искажению времени в экономике», чтобы объяснить математическую основу DTW и ее применение в прогнозировании рецессии.

Обратите внимание, что это «структура», вам нужно будет использовать свои уникальные навыки в эконометрике, чтобы определить лучший набор функций, схему нормализации и тип модели для создания наилучшего решения.

Динамическое искажение времени: математическое объяснение

Dynamic Time Warping — это мера сходства временных рядов, целью которой является поиск оптимального соответствия между двумя временными рядами путем минимизации расстояния между ними. Математически, учитывая два временных ряда X = {x1, x2, …, xm} и Y = {y1, y2, …, yn}, расстояние DTW вычисляется с использованием подхода динамического программирования. Алгоритм создает матрицу размера m x n, в которой каждая ячейка (i, j) содержит расстояние между xi и yj (Raihan, 2017).

Цель состоит в том, чтобы найти путь через матрицу с минимальным накопленным расстоянием с учетом определенных ограничений, например, начиная с (1, 1) и заканчивая (m, n), и допуская только диагональные, горизонтальные или вертикальные перемещения. Общее накопленное расстояние этого пути представляет собой расстояние DTW между двумя временными рядами (Raihan, 2017).

Применение DTW в прогнозировании рецессии

Raihan (2017) предлагает использовать DTW для выявления сходства экономических показателей между разными периодами рецессии. Применяя DTW к историческим экономическим данным, таким как кривые доходности, мы можем группировать похожие периоды и анализировать закономерности, ведущие к рецессиям. Это может помочь определить ранние предупреждающие сигналы о предстоящих циклах рецессии, что делает его ценным инструментом для экономического прогнозирования.

В этой статье мы расширяем подход Raihan (2017), комбинируя DTW с алгоритмом форсированного дерева, чтобы построить более надежную модель прогнозирования циклов рецессии в США. Алгоритм усиленного дерева может фиксировать эффекты взаимодействия между различными экономическими индикаторами, что еще больше повышает прогностическую силу нашей модели.

Пошаговый процесс

Импортировать необходимые библиотеки

import pandas as pd
import numpy as np
from fredapi import Fred
from sklearn.preprocessing import MinMaxScaler
from sklearn.pipeline import Pipeline
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_reportp

Получите ключ API от FRED (экономические данные Федеральной резервной системы) и настройте клиент FRED.

api_key = "your_api_key_here"
fred = Fred(api_key=api_key)
#Download daily yield curve data from FRED
features = {
    'T10Y2Y': '10-Year Treasury Constant Maturity Minus 2-Year Treasury Constant Maturity',
    'T10Y3M': '10-Year Treasury Constant Maturity Minus 3-Month Treasury Constant Maturity',
    'T10YFF': '10-Year Treasury Constant Maturity Minus Federal Funds Rate',
    'TB3SMFFM': '3-Month Treasury Bill Minus Federal Funds Rate'
}
data = pd.concat([fred.get_series(k) for k in features.keys()], axis=1)
data.columns = features.keys()

Загрузите данные о рецессии в США с сайта FRED.

us_recession_data = fred.get_series("USREC")

Нормализуйте данные кривой доходности с помощью MinMaxScaler и определите оптимальные лаги.

# Install FastDTW library: !pip install fastdtw
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean

# Normalize the yield curve data using MinMaxScaler and determine optimal lags
scaler = MinMaxScaler()
yield_curve_data_scaled = scaler.fit_transform(data)

lags = [1, 3, 5, 7, 10, 15, 20, 25]  # Change these to test different lags

yield_curve_lagged = pd.DataFrame()
for lag in lags:
    yield_curve_lagged[f"lag_{lag}"] = yield_curve_data_scaled[:-lag].reshape(-1)

# Calculate DTW distances for each lag
dtw_distances = []
for i in range(len(yield_curve_data_scaled) - 1):
    distance, _ = fastdtw(yield_curve_data_scaled[i], yield_curve_data_scaled[i + 1], dist=euclidean)
    dtw_distances.append(distance)

# Add DTW distances to the DataFrame
yield_curve_lagged["dtw_distance"] = [np.nan] + dtw_distances

yield_curve_lagged["target"] = [np.nan] + us_recession_data[1:].values.tolist()

# Align the length of the 'target' column with the length of the DataFrame
yield_curve_lagged = yield_curve_lagged.iloc[:-1]

Создайте модель расширенного дерева, используя конвейер и GridSearchCV.

from sklearn.model_selection import train_test_split

# Drop rows with NaN values due to lagging and DTW calculation
yield_curve_lagged = yield_curve_lagged.dropna()

# Create feature matrix X and target vector y
X = yield_curve_lagged.drop("target", axis=1)
y = yield_curve_lagged["target"]

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Create the boosted tree model using a pipeline and GridSearchCV
pipeline = Pipeline([
    ("gbc", GradientBoostingClassifier())
])

params = {
    "gbc__n_estimators": [100, 200, 300],
    "gbc__learning_rate": [0.01, 0.1, 0.2],
    "gbc__max_depth": [3, 4, 5]
}

grid_search = GridSearchCV(pipeline, param_grid=params, cv=5, scoring="accuracy", verbose=1)
grid_search.fit(X_train, y_train)

best_model = grid_search.best_estimator_
print("Best parameters:", grid_search.best_params_)

# Evaluate the model's performance
y_pred = best_model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
cm = confusion_matrix(y_test, y_pred)
report = classification_report(y_test, y_pred)

print(f"Accuracy: {accuracy}")
print(f"Confusion Matrix:\n{cm}")
print(f"Classification Report:\n{report}")

Сохраните артефакты модели в корзину S3.

#Save the trained model and scaler artifacts to an S3 bucket
import boto3
import joblib
from io import BytesIO

# Save the model and scaler to local files
joblib.dump(model, 'trained_model.pkl')
joblib.dump(scaler, 'scaler.pkl')

# Set up the S3 client
s3 = boto3.client('s3', aws_access_key_id='your_access_key', aws_secret_access_key='your_secret_key')

# Upload the model and scaler to S3
with open('trained_model.pkl', 'rb') as model_file:
    s3.upload_fileobj(model_file, 'your_s3_bucket_name', 'trained_model.pkl')

with open('scaler.pkl', 'rb') as scaler_file:
    s3.upload_fileobj(scaler_file, 'your_s3_bucket_name', 'scaler.pkl')

Загрузите модель в корзину S3.

import boto3
import pickle

s3 = boto3.client('s3')
bucket_name = 'your_bucket_name_here'

# Load the model
model_object = s3.get_object(Bucket=bucket_name, Key='model.pkl')
model = pickle.loads(model_object['Body'].read())

# Load the scaler
scaler_object = s3.get_object(Bucket=bucket_name, Key='scaler.pkl')
scaler = pickle.loads(scaler_object['Body'].read())

Создайте фляжное приложение и конечную точку прогнозирования.

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/predict', methods=['POST'])
def predict():
    data = request.get_json()
    yield_curve = np.array(data['yield_curve']).reshape(1, -1)
    scaled_yield_curve = scaler.transform(yield_curve)
    prediction = model.predict(scaled_yield_curve)
    return jsonify({'prediction': int(prediction[0])})

if __name__ == '__main__':
    app.run(debug=True)

Создайте Dockerfile для приложения Flask с необходимыми зависимостями.

# Use an official Python runtime as a parent image
FROM python:3.9

# Set the working directory to /app
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app

# Install any needed packages specified in requirements.txt
RUN pip install --trusted-host pypi.python.org -r requirements.txt

# Make port 80 available to the world outside this container
EXPOSE 80

# Define environment variable
ENV NAME World

# Run app.py when the container launches
CMD ["python", "app.py"]

Ниже приведены шаги по развертыванию приложения Flask с использованием AWS ECR, ECS с определением задачи Fargate.

Создайте новый репозиторий в AWS Elastic Container Registry (ECR) для своего приложения.

aws ecr create-repository --repository-name my-repo

Создайте образ Docker с помощью файла Dockerfile.

docker build -t my-repo .

Пометьте образ Docker и отправьте его в репозиторий ECR.

docker tag my-repo:latest <ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/my-repo:latest
aws ecr get-login-password --region <REGION> | docker login --username AWS --password-stdin <ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com
docker push <ACCOUNT_ID>.dkr.ecr.<REGION>.amazonaws.com/my-repo:latest

Создайте файл определения задачи Amazon ECS, task-definition.json, используя тип запуска Fargate и указав образ Docker, отправленный в ECR.

aws ecs register-task-definition --cli-input-json file://task-definition.json

Создайте новый сервис Amazon ECS, используя тип запуска Fargate и определение задачи.

aws ecs create-service --cluster my-cluster --service-name my-service --task-definition my-task-definition --desired-count 1 --launch-type FARGATE --platform-version LATEST --network-configuration "awsvpcConfiguration={subnets=[subnet-12345,subnet-67890],securityGroups=[sg-12345],assignPublicIp=ENABLED}"

Настройте службу для использования нужного количества задач и масштабирования в соответствии с вашими требованиями. (Можно обновить параметр — required-count в предыдущей команде или использовать команду aws ecs update-service.)

Создайте или используйте существующие VPC, подсети и группы безопасности для службы (если это еще не сделано). Вы также можете использовать Консоль управления AWS или команды CLI:

aws ec2 create-vpc, aws ec2 create-subnet, and aws ec2 create-security-group

Затем мы настраиваем службу для использования балансировщика нагрузки приложений (ALB), если это необходимо (вы также можете использовать балансировщик сетевой нагрузки). Вы можете создать ALB с помощью Консоли управления AWS или с помощью следующей команды:

aws elbv2 create-load-balancer

Отслеживайте статус сервиса в Консоли управления AWS. Протестируйте приложение Flask, обратившись к его конечной точке через ALB или общедоступный IP-адрес запущенной задачи Fargate. Наконец, настройте любой необходимый мониторинг, ведение журналов или оповещения для вашего приложения с помощью Amazon CloudWatch. Вы можете использовать Консоль управления AWS или команды CLI, такие как aws cloudwatch put-metric-alarm, для настройки сигналов тревоги.

Заключение

В этой статье мы продемонстрировали, как построить прогностическую модель для цикла рецессии в США, используя динамическое искажение времени и данные кривой дневной доходности. Мы использовали алгоритм усиленного дерева для захвата эффектов взаимодействия и разработали модель прогнозирования. Приложение было развернуто с помощью Flask, AWS Elastic Container Registry (ECR) и Elastic Container Service (ECS) с определением задачи Fargate.

Включив элементы из статьи Таснима Райхана, мы обеспечили более глубокое понимание математической основы DTW и ее применения в прогнозировании рецессии. Комбинация DTW с алгоритмом форсированного дерева позволила нам точно зафиксировать базовые закономерности в данных кривой доходности и построить более надежную модель прогнозирования циклов рецессии в США. Развертывание в инфраструктуре AWS обеспечивает простое масштабирование и интеграцию с другими сервисами, что делает его ценным инструментом для экономического анализа и прогнозирования.

Ссылка

Райхан, Т. (2017). Прогнозирование рецессии в США: динамическое упражнение по искажению времени в экономике. Доступно в SSRN: https://ssrn.com/abstract=3047649 или http://dx.doi.org/10.2139/ssrn.3047649

Дополнительные материалы на PlainEnglish.io.

Подпишитесь на нашу бесплатную еженедельную рассылку новостей. Подпишитесь на нас в Twitter, LinkedIn, YouTube и Discord .