Начните путешествие с данными с некоторых первых шагов процесса приема данных.

В жизни каждого аналитика наступает момент, когда он должен получать, обрабатывать и хранить данные, чтобы иметь то, что им нужно для фазы анализа. К сожалению, не все чисто и красиво упаковано из стандартного набора данных. Если вы не являетесь опытным инженером по обработке данных или не привыкли к процессу ETL, это может оказаться сложной задачей и вызвать некоторые концептуальные проблемы, с которыми вам придется работать.

Я подумал, раз уж я прошел через этот процесс и он свеж в памяти, я напишу статью и, надеюсь, помогу кому-нибудь в их собственной проблеме. Предупреждаю, что это не выставочный код. В моем случае это работает, но на виду, бородавки и все такое. Я очень мало занимался очисткой или оптимизацией, так что имейте это в виду.

Давайте начнем.

Требования

Я сделал это в основном довольно простым с точки зрения того, что ему нужно:

- Python (я использовал версию 3.9, поскольку она работает на моей рабочей станции для разработки)

- панды

- psycopg2 (библиотека Python для доступа к базе данных)

- Datapungi_fed (эта библиотека обеспечивает легкий доступ к базам данных Федеральной резервной системы)

- PostgreSQL (в настоящее время я использую версию 13, но почти любая последняя версия будет работать для того, что мы здесь делаем)

Совсем неважно.

Примечание. Для получения этих данных вам необходимо зарегистрироваться в федеральном резерве, чтобы получить ключ API. Они доступны по адресу: https://research.stlouisfed.org/docs/api/api_key.html

Данные

Сначала поговорим о данных, которые мы получим.

Федеральная резервная система поддерживает огромную базу данных всевозможных наборов экономических данных, разбросанных по многочисленным базам данных. Почти все это наборы временных рядов, которые представлены в форме даты и данных, извлеченных в соответствии с различными идентификаторами отчета.

Краткое примечание об использовании: Федеральная резервная система предоставляет набор данных для личного некоммерческого использования, поэтому вы можете создавать и анализировать выбранные вами длины. Но, если вы хотите прочитать все условия, вот ссылка на юридический FAQ: https://fred.stlouisfed.org/legal/

Библиотека datapungi_fed выполняет короткую работу по фактическому извлечению процесса и берет на себя всю базовую работу. Мы просто вызываем отчет по имени, и библиотека возвращает нам серию, которой мы затем можем управлять с помощью панд.

Я обнаружил, что в повседневном использовании я использовал API больше, чем мне действительно хотелось. Я хотел убедиться, что: а) я ограничил доступ к API до приемлемых чисел. В конце концов, я вежливый хранитель данных. Б) Я хотел, чтобы данные были ближе к моему конечному пункту назначения и чтобы предотвратить проблемы с подключением, обслуживанием сервера и другие проблемы с подключением. Я также хотел иметь возможность выполнять больше серверной обработки там, где это необходимо, и снимать некоторую нагрузку с передней части. По этой причине я поместил это в базу данных.

Другими данными, которые я хотел получить от ФРС, был список выпускаемых ежедневных отчетов. Это было немного иначе и включало строку, содержащую идентификатор, даты начала и окончания, название отчета, ссылку, флаг пресс-релиза и примечания.

Во втором запросе была проведена некоторая дезинфекция данных для правильной загрузки, но мы тоже будем работать над этим.

База данных

Простая схема, которую я составил для базы данных, состоит из четырех таблиц:

  1. fed_reports
  2. fed_reports_tmp
  3. fed_releases
  4. fed_releases_tmp

Таблицы plain и _tmp по сути являются копиями друг друга. Они служат для выполнения начальной вставки в базу данных в таблице _tmp, а затем копирования только новых данных в основную таблицу, после чего таблицы _tmp очищаются.

Структура таблиц 1 и 2:

CREATE TABLE public.fed_reports (
  report_date DATE NOT NULL,
  data NUMERIC NOT NULL,
  report_name VARCHAR NOT NULL,
  hash VARCHAR NOT NULL PRIMARY KEY
);

Точно так же структура таблиц 3 и 4:

CREATE TABLE public.fed_reports (
  release_date DATE NOT NULL,
  report_name VARCHAR NOT NULL,
  report_link VARCHAR NOT NULL,
  notes VARCHAR NOT NULL,
  hash VARCHAR NOT NULL PRIMARY KEY
);

Это обеспечивает четыре контейнера для данных на различных этапах процесса. Теперь нам нужны некоторые методы, чтобы все могло перемещаться должным образом.

Первые две функции предназначены для присвоения хэш-значения содержимому таблиц _tmp. Это сделано, потому что мне нужны только новые данные. Хешируя содержимое, я определяю уникальный ключ для каждой строки, который дает мне что-то сопоставимое при вставке. Я расскажу об этом подробнее позже, когда перейду к собственно поисковому коду, так что наберитесь терпения.

CREATE OR REPLACE FUNCTION public.rehash_fed_reports()
  RETURNS integer
  LANGUAGE 'plpgsql'
  VOLATILE
  PARALLEL UNSAFE
  COST 100
AS $BODY$
    begin
      UPDATE fed_reports_tmp
      SET hash = md5(
        cast(report_date as VARCHAR) ||
        cast(data as VARCHAR) ||
        cast(report_name as VARCHAR)
      )
    return 0;
  end;
  $BODY$;
CREATE OR REPLACE FUNCTION public.rehash_fed_releases()
  RETURNS integer
  LANGUAGE 'plpgsql'
  VOLATILE
  PARALLEL UNSAFE
  COST 100
AS $BODY$
  begin
    UPDATE fed_releases
    SET = md5(
      cast(release_date as VARCHAR) ||
      cast(report_name as VARCHAR) ||
      cast(report_link as VARCHAR) ||
      cast(notes as VARCHAR)
    )
    return 0;
  end;
  $BODY$;

Обе эти функции в основном похожи. Они выполняют определенное обновление строк таблицы и возвращают целое число 0. Нет никакой реальной проверки ошибок или какой-либо другой отказоустойчивости просто потому, что если они не выполнят свой процесс, функция вернет ошибку, и транзакция откатится на дефолт. Это упрощает работу с этим приложением.

Обратите внимание, что для использования функции md5 я конвертирую все данные в текстовый формат и объединяю результаты. Пока используется одна и та же методология, это не проблема.

Наконец, я определил функцию для перемещения данных.

CREATE OR REPLACE FUNCTION public.fed_report_update()
  RETURNS integer
  LANGUAGE 'plpgsql'
  VOLATILE
  PARALLEL UNSAFE
  COST 100
AS $BODY$
  begin
  
    INSERT INTO fed_releases (
      release_date, 
      report_name, 
      report_link, 
      notes, 
      hash)
    SELECT * FROM 
      fed_releases_tmp 
    WHERE 
      hash NOT IN (
        SELECT hash FROM fed_releases
    );
  
    INSERT INTO 
      fed_reports (
        report_date, 
        data, 
        report_name, 
        hash)
    SELECT * FROM 
      fed_reports_tmp 
    WHERE 
      hash NOT IN (
        SELECT hash FROM fed_reports);
return 0;
    end;
  $BODY$;

Эта функция просто использует оператор INSERT INTO для выполнения вставки на основе результатов подзапроса. В этом случае я выбираю все из каждой таблицы _tmp, в которой хэш еще не существует в основной таблице. Это гарантирует, что добавляются только новые данные.

В качестве альтернативы я мог бы просто каждый раз удалять данные в основной таблице и повторно вставлять все без всех шагов хеширования, но в случае выпущенных отчетов мне нужна ежедневная история, и я не могу получить эти данные снова, так что это необходимо. Если я сделаю это для одного, почему бы просто не использовать магию копирования и вставки и не сделать это для обоих?

Наконец, единственное, что я здесь не помещаю, так это функцию очистки таблиц _tmp. У меня они заключены в некоторые другие процессы, которые я использую как часть более крупного приложения, но, учитывая возможности примеров кода, вы легко сможете сделать это самостоятельно.

И это база данных. Достаточно просто, правда?

Код для извлечения

Теперь мы переходим к самому интересному - к коду Python.

По нескольким причинам это не будет полным сценарием «все-в-одном», поскольку есть некоторые вещи, которые не имеют отношения к моему собственному коду (и мне слишком лень переписывать рабочий шаблон только для этой статьи), но не останется ничего важного. Но если вы все же застряли, просто оставьте комментарий, и я постараюсь дать вам ответ - в отличие от некоторых авторов, я проверяю и пытаюсь ответить.

Во-первых, давайте сделаем важные вещи:

import pandas as pd
import datapungi_fed as dpf
data = dpf.data("put your API code here")
# Note: please change your database, username & password as per your own values
conn_params_dic = {
  "host": "address of database",
  "database": "name of database",
  "user": "user of database",
  "password": "password of database"
}

Мы сделали две вещи: 1) мы установили переменную для нашего ключа API и 2) определили строку подключения для нашей базы данных. Оба они позволяют получить доступ к двум необходимым нам внешним ресурсам.

def get_report(report_name):
  try:
    df = data.series(report_name)
    df.rename(columns={report_name: 'data'}, inplace=True)
    df['report'] = report_name
    df.reset_index(drop=False, inplace=True)
  except:
    df = pd.DataFrame(columns = ['date', 'data', 'report'])
    print(report_name + ' Empty Set')
  return df

Затем я создаю быструю служебную функцию, которая принимает имя отчета в качестве входных данных и использует созданный объект данных для извлечения отчета, который имеет ряд с именами date и report_name в качестве столбцов. Поскольку я храню все отчеты в одной большой таблице с идентификатором имени отчета, я хочу сделать все единообразным, поэтому я переименовал столбец report_name в просто «данные». Затем я создаю столбец «отчет» для хранения имени и отбрасываю индекс, который по умолчанию является датой.

Это помещает все в надлежащий формат.

Если есть пустой отчет, что время от времени случается, я просто возвращаю пустой фрейм данных с заголовками и печатаю предупреждение.

Разобравшись с предварительными мероприятиями, давайте сначала сделаем исключение, то есть выпущенные отчеты, и устраним их.

released = data.releases()
released.drop(columns=['id', 'realtime_end', 'press_release'], inplace = True)
released.rename(columns={'realtime_start': 'Date'}, inplace=True)
released.rename(columns={'name': 'Report Name'}, inplace=True)
released.rename(columns={'link': 'Link'}, inplace=True)
released.rename(columns={'notes': 'Notes'}, inplace=True)
released['Report Name'] = released['Report Name'].str.replace(r'"', '')
released['Report Name'] = released['Report Name'].str.replace(r',', '')
released['Link'] = released['Link'].str.replace(r'"', '')
released['Link'] = released['Link'].str.replace(r',', '')
released['Notes'] = released['Notes'].str.replace(r'"', '')
released['Notes'] = released['Notes'].str.replace(r',', '')
released['Notes'] = released['Notes'].str.replace(r'\n', '')
released['Notes'] = released['Notes'].str.replace(r'\r', '')
released['Notes'] = released['Notes'].str.replace(r'\\', '')
released['hash'] = released.apply(lambda x: hash(tuple(x)), axis=1)

Это ужасный фрагмент кода, но он выполняет свою работу. Если бы я был более амбициозным или это было бы больше, чем единичный раз, я бы, вероятно, сделал больше, чтобы поместить его в контейнер и сделать его немного более портативным, но это то, что есть.

Здесь мы берем новые выпуски отчетов (предназначенные для таблицы fed_releases). Это хранится в «релизах». Затем я отбрасываю столбцы, которые для меня не важны, такие как ID, realtime_end и press_release.

Далее переименовываю оставшиеся столбцы. Это не является строго необходимым, но служит отходом от более раннего способа, которым я использовал эти данные, поэтому они были оставлены в процессе.

После этого содержимое нужно очистить. С этими данными не уделяется особого внимания, поэтому возникает много плохого форматирования. Некоторые простые замены решают проблемы.

Поскольку эти данные сначала будут перемещены в CSV, прежде чем они будут скопированы в базу данных с помощью команды COPY, большинство проблем связано с этим преобразованием. Мы удаляем такие вещи, как запятые, кавычки, символы обратной косой черты и, конечно же, символы новой строки. Любой из них может довольно быстро испортить ваш день и привести к очень бесполезным сообщениям об ошибках.

Последний шаг - это хэш с использованием хеш-библиотеки python.

Но Брэд, мы только что прошли весь этот процесс в последнем разделе о хешировании данных в базе данных с помощью функции md5 - что дает?

Короткий ответ заключается в том, что иногда результаты хеш-функции python в этом случае несовместимы. Я обнаружил, что глупости причиняют мне боль, так что это был познавательный опыт.

Я оставил это здесь по уважительной причине. Мое хеш-значение в моей таблице было определено как первичный ключ. Я мог бы отказаться от этого и сделать поле обнуляемым, или я мог бы сделать несколько других обходных решений, но я понял, что будет намного меньше работы просто обновить значения с новым хешем после импорта. Я выбрал путь наименьшего количества работы. Если вы хотите поиграть с ним, у вас могут быть лучшие результаты, но я знаю, что моя текущая установка работает надежно.

Пройдя дальше, мы готовы сделать первую вставку. Я обычно использую некоторые другие функции, которые я определил для этой цели, но я также помещу их сюда для вашей справки:

conn = dbf.connect(conn_params_dic)
conn.autocommit = True
dbf.copy_from_dataFile(conn, released, 'fed_releases_tmp')

Ссылка dbf относится к отдельному модулю, который у меня есть, который просто содержит функции базы данных, которые я использую. У меня есть функция подключения и функция copy_from_datafile. Вот эти функции для справки:

def connect(conn_params_dic):
  conn = None
  try:
    conn = psycopg2.connect(**conn_params_dic)
    if debug == 1:
      print("Connection successful..................")
  except OperationalError as err:
    show_psycopg2_exception(err)
    # set the connection to 'None' in case of error
    conn = None
  return conn
def copy_from_dataFile(conn, df, table):
  tmp_df = "/tmp/insert_data.csv"
  df.to_csv(tmp_df, header=False, index=False)
  f = open(tmp_df, 'r')
  cursor = conn.cursor()
  try:
    cursor.copy_from(f, table, sep=",")
    conn.commit()
    print("Data inserted successfully....")
    cursor.close()
  except (Exception, psycopg2.DatabaseError) as error:
    os.remove(tmp_df)
    # pass exception to function
    show_psycopg2_exception(error)
    cursor.close()
  os.remove(tmp_df)

Кроме того, также используются эти две функции:

def show_psycopg2_exception(err):
  # get details about the exception
  err_type, err_obj, traceback = sys.exc_info()
  line_n = traceback.tb_lineno
  print("\npsycopg2 ERROR:", err, "on line number:", line_n)
  print("psycopg2 traceback:", traceback, "-- type:", err_type)
  print("\nextensions.Diagnostics:", err.diag)
  print("pgerror:", err.pgerror)
  print("pgcode:", err.pgcode, "\n")
def query_return(query):
  conn = connect(conn_params_dic)
  try:
    query_1 = pd.read_sql_query(query, conn)
    df = pd.DataFrame(query_1)
  except (Exception, psycopg2.Error) as error:
    print("Error while fetching data from PostgreSQL", error)
  finally:
    if conn:
      conn.close()
  return df

В целом, эти функции позволяют устанавливать соединение psycopg2, вставлять данные с помощью команды копирования, обрабатывать исключения и выбирать возникающие запросы.

Их можно либо включить в ваш основной файл загрузки, либо, как я сделал, поместить в отдельный модуль для доступа. Все зависит от того, что и как вы создаете свое приложение.

Сводка на полпути

Ух ты, это уже много вещей, которые мы уже рассмотрели. Давайте подышим на секунду и подведем итоги.

Мы создали нашу базу данных и настроили схему для хранения данных. Мы определили функции, которые управляют данными в серверной части и обрабатывают все, поэтому у нас есть инкрементные обновления.

В процессе получения данных мы настроили подключение к базе данных и API от Федеральной резервной системы. Мы получили список недавно выпущенных отчетов, обработали их на предмет некорректных символов и запустили функции для их вставки в базу данных.

К настоящему времени ваша таблица fed_releases_tmp должна быть забита новыми отчетами, готовыми к обработке.

Давайте перейдем к последней части этого пути, которая заключается в использовании списка идентификаторов отчетов для получения фактических данных отчета временных рядов, вставки их в базу данных и затем завершения, чтобы остались заполнены только основные таблицы.

Код извлечения, часть 2

Наш следующий шаг - определить список отчетов, которые нам нужны. Определите, что вас интересует, и это ваш бизнес, и вам нужно будет потратить некоторое время на https://fred.stlouisfed.org/, просматривая их список отчетов, чтобы определить, что вы хотите. Но как только вы получите этот список, ваша жизнь будет наполнена свежими экономическими данными.

Вот частичный образец моего списка:

report_list = [
  'WM1NS', # M1 Supply
  'WM2NS', # M2 Supply
  'ICSA', # Unemployment
  'CCSA', # Continued Unemployment
  'JTSJOL', # Job Openings: Total Nonfarm
  'PAYEMS', # Non-Farm Employment
  'RSXFS', # Retail Sales
  'TCU', # Capacity Utilization
  'UMCSENT', # Consumer Sentiment Index
  'BUSINV', # Business Inventories
  'INDPRO', # Industrial Production Index
  'GACDFSA066MSFRBPHI', # Philidelphia Fed Manufacturing Index
  'GACDISA066MSFRBNY', # Empire State Manufacturing Index
  'BACTSAMFRBDAL', # Current General Business Activity; Diffusion Index for Texas
  'IR', # Import Price Index
  'IQ', # Export Price Index
  'PPIACO', # Producer Price Index - all
  'CPIAUCSL', # Consumer Price Index - all
  'CPILFESL', # Consumer Price Index (Core)
  'MICH', # University of Michigan: Inflation Expectation
  'CSCICP03USM665S', # Consumer Opinion Surveys: Confidence Indicators: Composite Indicators: OECD Indicator for the United States
]

Очевидно, это повсюду, но показывает множество общих экономических показателей. Но нам нужно обработать это:

# Pull all the reports into a big honking dataframe
all_rep = []
for i in report_list:
  df1 = get_report(i)
  all_rep.append(df1)
df = pd.concat(all_rep)
df['hash'] = df.apply(lambda x: hash(tuple(x)), axis=1)

Очень просто, я создаю список all_rep как пустой заполнитель. Оттуда я читаю свой список report_list и для каждого отчета вызываю функцию get_report, помещая результаты в df1. Я беру результат и добавляю df1 к all_rep.

Я выполняю окончательную конкатенацию, а затем хеширую полученные данные в новый столбец с именем «хэш». См. Мое предыдущее обсуждение этого вопроса.

Теперь у нас есть последняя серия, которую можно передать в функцию и вставить:

conn = dbf.connect(conn_params_dic)
  conn.autocommit = True
  dbf.copy_from_dataFile(conn, df, 'fed_reports_tmp')

Это то же самое, что мы сделали с fed_releases.

На этом этапе обе наши таблицы _tmp должны быть заполнены и готовы к обработке.

Это подводит нас к нашему последнему шагу:

# Final Cleanup
try:
  cursor = conn.cursor()
  query = "SELECT rehash_fed_reports_tmp()"
  cursor.execute(query)
  return_val = cursor.fetchall()
  conn.commit()
  cursor = conn.cursor()
  query = "SELECT rehash_fed_releases_tmp()"
  cursor.execute(query)
  return_val = cursor.fetchall()
  conn.commit()
  cursor = conn.cursor()
  query = "SELECT fed_report_update()"
  cursor.execute(query)
  return_val = cursor.fetchall()
  conn.commit()
  cursor = conn.cursor()
  query = "SELECT symbol_cleanup()"
  cursor.execute(query)
  return_val = cursor.fetchall()
  conn.commit()
  print("Records updated and cleaned up.......")
except (Exception, dbf.psycopg2.Error) as error:
  print("Error while fetching data from PostgreSQL", error)
finally:
  # closing database connection.
  if conn:
    cursor.close()
    conn.close()
    print("PostgreSQL connection is closed")

На этом шаге мы выполняем наши последние операторы выбора для запуска функций, которые мы ранее определили. Первые две перефразируют таблицы _tmp. Третья функция обновляет первичные таблицы из таблиц _tmp. Последняя функция очищает таблицы _tmp и подготавливает их к следующему запуску.

Последний процесс - просто закрыть наше соединение с базой данных и выйти с несколькими дружественными сообщениями, чтобы сообщить нам, что все в порядке.

Планирование

И последнее, что мне нужно сделать - запустить это без присмотра. Для этого у меня есть базовая запись cron, которая запускает процесс каждые 6 часов в будние дни:

* */6 * * 1-5 /Library/Frameworks/Python.framework/Versions/3.9/bin/python3 ./data_miners/pull_fed_data.py 2>&1

Заключение

Трудно сказать, что заняло больше времени, если написать весь процесс или эту статью, объясняющую процесс. Нельзя сказать, что эта статья заняла много времени, более того, как только вы усвоите ритм, довольно легко получить данные, очистить их и бросить в базу данных для анализа.

Я рекомендую работать над этими типами процессов. Лучший учитель - невзгоды и устранение ошибок. Где это можно улучшить? Какие ошибки вы видите? Есть ли у вас лучшее решение?

Но в мире науки о данных и анализа данных получение данных - это только первый шаг. Используя эту статью в качестве руководства, теперь у вас есть целый набор актуальных и актуальных данных временных рядов, с которыми можно поиграть. Что ты будешь с этим делать?