Простой проект на конвейерах ETL в Python

Добро пожаловать в мир интеграции и кластеризации данных! В этом блоге мы углубимся в интригующее задание, демонстрирующее возможности Python в управлении и анализе данных из различных источников. В нашу задачу входит построение таблиц измерений и фактов, выполнение инкрементной загрузки данных и вычисление ежедневной статистики. Давайте рассмотрим, как мы достигаем этих целей шаг за шагом.

_____________________________________________________________

Чтобы начать наше путешествие, мы начнем с начального набора данных, расположенного в каталоге 01SOURCE/INITIAL в Datalake. Этот набор данных под названием «Sales Superstore.xlsx» служит нашей основой. Первая часть нашего задания включает в себя создание таблиц DIM_CUSTOMER, DIM_PRODUCT и FACT_DAILY_STATS на основе этих исходных данных.

Таблица DIM_CUSTOMER: таблица DIM_CUSTOMER содержит важную информацию о клиенте, такую ​​как идентификатор клиента и имя клиента. Используя библиотеку pandas в Python, мы фильтруем и удаляем дубликаты из исходного набора данных, чтобы создать таблицу DIM_CUSTOMER. Для обеспечения уникальности мы присваиваем каждому клиенту уникальный идентификатор (Customer_UID).

Таблица DIM_PRODUCT. Подобно таблице DIM_CUSTOMER, в таблице DIM_PRODUCT хранятся данные, связанные с продуктом, включая идентификатор продукта, название продукта, категорию и подкатегорию. Опять же, мы удаляем дубликаты и присваиваем каждому продукту уникальный Product_UID.

Таблица FACT_DAILY_STATS. Таблица FACT_DAILY_STATS служит хранилищем ежедневной статистики продаж. Мы объединяем таблицы DIM_CUSTOMER и DIM_PRODUCT с исходным набором данных на основе общих ключей (Customer ID и Product ID соответственно). Этот процесс слияния позволяет нам получить исчерпывающий набор данных с подробной информацией о транзакциях. С помощью Transaction_UID, назначенного каждой записи, мы собираем важные сведения, такие как идентификатор заказа, дата заказа, дата отгрузки, режим отгрузки, продажи, количество и прибыль.

import pandas as pd
import os
import glob
import time

# To change the Datalake starting directory accordingly
GLOBAL_START_FILEPATH = r"C:\Users\matth\Documents\DATALAKE"
SOURCE_FILEPATH = GLOBAL_START_FILEPATH + r"\01SOURCE\INITIAL"
INCREMENTAL_FILEPATH = GLOBAL_START_FILEPATH + r"\01SOURCE\INCREMENTAL"
SOURCE = SOURCE_FILEPATH + "\Sales Superstore.xlsx"
df = pd.read_excel(SOURCE)

DIM_FILEPATH = GLOBAL_START_FILEPATH + r"\02DIM"
FACT_FILEPATH = GLOBAL_START_FILEPATH + r"\03FACT"
DATAMART_FILE = GLOBAL_START_FILEPATH + r"\04DATAMART"

  1. Отслеживание Created_Time: в основе функции «add_time» лежит возможность отслеживать «Created_Time». Используя библиотеку pandas, он создает новый столбец с именем «Created_Time» и заполняет его текущей отметкой времени с помощью функции «pd.Timestamp(‘now’)». Эта отметка времени отмечает точный момент начала процесса загрузки данных, фиксируя точное время начала интеграции данных.
  2. Генерация Audited_Time: Затем функция генерирует «Audited_Time», который является форматированным представлением «Created_Time» для целей аудита. Для этого он использует функцию «pd.to_datetime()» pandas для преобразования столбца «Created_Time» в формат даты и времени. Впоследствии параметр «format=’%H:%M’» используется для указания желаемого формата времени в виде часов и минут (ЧЧ:ММ). Результатом стал новый столбец с именем «Audited_Time», обеспечивающий четкое и удобочитаемое представление времени начала интеграции данных.
  3. Повышение целостности данных: для дальнейшего обеспечения целостности данных столбец «Created_Time» удаляется из DataFrame с помощью «del sd[‘Created_Time’]». Этот шаг устраняет любую путаницу и предотвращает избыточность в наборе данных, оставляя после себя только ценную информацию, включая «Audited_Time.
## function to track the actual time of initial loading and increment loading for auditing
def add_time(sd):
    sd['Created_Time'] = pd.Timestamp("now")
    # sd['new_date'] = pd.to_datetime(sd['Created_Time'], format='%Y-%m-%d').dt.date
    sd['Audited_Time'] = pd.to_datetime(sd['Created_Time'], format='%H:%M').dt.time

    del sd['Created_Time']
    return sd

Функция «initialLoad». Функция «initialLoad» служит краеугольным камнем нашего путешествия по интеграции данных, эффективно создавая таблицы DIM и FACT на основе исходного источника данных.

  1. Построение DIM_CUSTOMER: функция «initialLoad» начинается с построения таблицы DIM_CUSTOMER. Он идентифицирует и выбирает соответствующие столбцы («Идентификатор клиента» и «Имя клиента») из исходного источника данных («df»). Используя функцию «фильтра» и удаляя все повторяющиеся строки, он гарантирует, что таблица DIM_CUSTOMER содержит уникальные данные о клиентах. «Customer_UID» присваивается последовательно каждой записи клиента, устанавливая уникальный идентификатор для дальнейшего использования. Впоследствии таблица DIM_CUSTOMER сохраняется в виде файла Excel, что облегчает дальнейшую обработку и анализ данных.
  2. Создание DIM_PRODUCT. Следуя той же методике, функция переходит к созданию таблицы DIM_PRODUCT. Здесь соответствующие столбцы («Идентификатор продукта», «Название продукта», «Категория» и «Подкатегория») фильтруются, а дубликаты удаляются. Подобно DIM_CUSTOMER, каждой записи продукта присваивается уникальный «Product_UID», что упрощает последующую обработку данных. Таблица DIM_PRODUCT также сохраняется в виде файла Excel, готового к включению в рабочий процесс интеграции данных.
  3. Генерация FACT_DAILY_STATS: следующей на очереди является таблица FACT_DAILY_STATS, которая представляет собой кульминацию первоначальных усилий по интеграции данных. Используя информацию из DIM_CUSTOMER и DIM_PRODUCT, функция «initialLoad» объединяет исходный набор данных с этими измерениями для создания комплексной таблицы FACT_DAILY_STATS. Он эффективно объединяет различные поля, такие как «Идентификатор заказа», «Дата заказа», «Дата отгрузки», «Режим доставки», «Customer_UID», «Product_UID», «Продажи», «Количество» и «Прибыль» в одном , сплоченный стол. Кроме того, каждой транзакции присваивается уникальный «Transaction_UID», что облегчает эффективное отслеживание данных. В качестве последнего шага таблица FACT_DAILY_STATS сохраняется в виде файла Excel, что завершает первоначальный процесс интеграции данных.
def initialLoad(df, DIM_FILEPATH, FACT_FILEPATH):
    print("Constructing DIM and FACT Tables based on /01Source/Initial")
    #DIM_CUSTOMER
    keep = ["Customer ID", "Customer Name"]
    global DIM_CUSTOMER
    DIM_CUSTOMER = df.filter(items=keep)
    DIM_CUSTOMER.drop_duplicates(inplace=True)
    DIM_CUSTOMER.reset_index(inplace=True, drop=True)
    DIM_CUSTOMER['Customer_UID'] = DIM_CUSTOMER.index + 1
    columns = ["Customer_UID", "Customer ID", "Customer Name"]
    DIM_CUSTOMER = DIM_CUSTOMER[columns]
    # DIM_CUSTOMER OUTPUT
    OUTPUT = DIM_FILEPATH + "\DIM_CUSTOMER.xlsx"
    DIM_CUSTOMER.to_excel(OUTPUT, index=False)

    # DIM_PRODUCT
    keep = ["Product ID", "Product Name", "Category", "Sub-Category"]
    global DIM_PRODUCT
    DIM_PRODUCT = df.filter(items=keep)
    DIM_PRODUCT.drop_duplicates(inplace=True)
    DIM_PRODUCT.reset_index(inplace=True, drop=True)
    DIM_PRODUCT['Product_UID'] = DIM_PRODUCT.index + 1
    temp_cols = DIM_PRODUCT.columns.tolist()
    new_cols = temp_cols[-1:] + temp_cols[:-1]

    DIM_PRODUCT = DIM_PRODUCT[new_cols]
    # DIM_PRODUCT OUTPUT
    OUTPUT_2 = DIM_FILEPATH + "\DIM_PRODUCT.xlsx"
    DIM_PRODUCT.to_excel(OUTPUT_2, index=False)

    # FACT_DAILY_STATS
    df_copy = df.copy()
    customer_merger = df_copy.merge(DIM_CUSTOMER, how='inner', on=['Customer ID'], suffixes=('_left', '_right'))
    complete_merger = customer_merger.merge(DIM_PRODUCT, how='inner', on=['Product ID'], suffixes=('_left', '_right'))
    drop_fields = ['Customer ID', 'Customer Name_left', 'Product ID', 'Category_left', 'Sub-Category_left',
                   'Product Name_left', 'Customer Name_right', 'Product Name_right', 'Sub-Category_right',
                   'Category_right']
    complete_merger.drop(labels=drop_fields, axis=1, inplace=True)
    complete_merger.drop_duplicates(inplace=True)
    complete_merger.reset_index(inplace=True, drop=True)
    complete_merger['Transaction_UID'] = complete_merger.index + 1
    cols = ["Transaction_UID", "Order ID", "Order Date", "Ship Date", "Ship Mode", "Customer_UID", "Product_UID", "Sales", "Quantity",
            "Profit"]

    global FACT_DAILY_STATS
    FACT_DAILY_STATS = complete_merger[cols]
    FACT_DAILY_STATS.reset_index(inplace=True, drop=True)
    FACT_DAILY_STATS['Transaction_UID'] = FACT_DAILY_STATS.index + 1

    # FACT_DAILY_STATS OUTPUT
    global OUTPUT_3
    OUTPUT_3 = FACT_FILEPATH + "\FACT_DAILY_STATS.xlsx"
    FACT_DAILY_STATS.to_excel(OUTPUT_3, index=False)


#getting data from the incremental files to insert new rows / update the existing row in DIM and FACT tables
#updates by replacing records with new values from incremental files, insert new rows if records are absent from source

Добавочная загрузка данных: построение на фундаменте

Загрузка исходных данных закладывает основу для всестороннего анализа. Однако на этом задача не заканчивается; данные постоянно развиваются и обновляются, что требует постепенного процесса загрузки данных для поддержания актуальности и актуальности информации. Функция «incrementalLoad» играет ключевую роль в этом процессе, эффективно обновляя таблицы DIM и FACT на основе файлов добавочных данных. Давайте рассмотрим, как эта функция обеспечивает плавную добавочную интеграцию данных.

  1. Выбор файла данных: функция «incrementalLoad» начинается с выбора файлов из каталога «INCREMENTAL_FILEPATH». Эти файлы представляют собой добавочные обновления данных, регулярно получаемые для поддержания актуальности и актуальности данных. Используя функцию «glob.glob», функция захватывает все файлы Excel в указанном каталоге.
  2. Интеграция инкрементных данных. В основе функции «incrementalLoad» лежит процесс интеграции инкрементных данных. Он начинается с сканирования каждого файла один за другим для определения ежедневных приращений. На основе ежедневного счетчика функция обрабатывает каждый файл и соответствующим образом обновляет таблицы DIM и FACT.
  3. DIM CUSTOMER INCREMENT LOAD: для инкрементного обновления таблицы DIM_CUSTOMER функция выполняет «внешнее слияние» между существующим DIM_CUSTOMER и инкрементными данными из выбранного файла. Результатом слияния является полный набор данных о клиентах, как существующих, так и вновь добавленных. «Customer_UID» переиндексируется для сохранения уникальности, а обновленная таблица DIM_CUSTOMER сохраняется в виде файла Excel.
  4. DIM PRODUCT INCREMENT LOAD: функция также постепенно обновляет таблицу DIM_PRODUCT. Он объединяет существующий DIM_PRODUCT с добавочными данными и пересчитывает «Product_UID» для сохранения уникальности. Кроме того, функция «add_time» используется для отслеживания «Audited_Time» этого обновления. Обновленная таблица DIM_PRODUCT затем сохраняется в виде файла Excel.
  5. ФАКТИЧЕСКАЯ ЕЖЕДНЕВНАЯ ИНКРЕМЕНТНАЯ НАГРУЗКА. Точно так же таблица FACT_DAILY_STATS подвергается инкрементным обновлениям. Функция выполняет «внешнее слияние» между существующим FACT_DAILY_STATS и добавочными данными. «Transaction_UID» переиндексируется для обеспечения уникальности, а отсутствующие «Customer_UID» и «Product_UID» заменяются средними значениями. Как и в случае с DIM_PRODUCT, функция «add_time» отслеживает «Audited_Time» этого обновления. Обновленная таблица FACT_DAILY_STATS сохраняется в виде файла Excel.
  6. Вычисление ежедневной статистики: после каждого добавочного обновления функция вызывает функцию «computeDailyStatistic» для вычисления ежедневной статистики продаж на основе обновленной таблицы FACT_DAILY_STATS. Эта ежедневная статистика сохраняется в каталоге DATAMART_FILE в виде файла Excel.
def incrementalLoad(file):
    print("Performing Incremental Data File based on File provided:", file)
    filename = INCREMENTAL_FILEPATH
    files = glob.glob(filename + '\*.xlsx')
    daily_counter = 1

    for increment in files:
        if increment[-6] == str(daily_counter):
            # print(f'Increment day {daily_counter} ')
            increment_scan = pd.read_excel(increment)

            # DIM CUSTOMER INCREMENT LOAD : OUTER MERGE INCREMENTAL LOAD WITH DIM CUSTOMER TABLE
            increment_customer = DIM_CUSTOMER.copy()
            customer_merger = increment_customer.merge(increment_scan, how='outer')
            customer_merger.reset_index(inplace=True, drop=True)
            cols = ["Customer_UID", "Customer ID", "Customer Name"]
            NEW_DIM_CUSTOMER = customer_merger[cols]
            # df_existing.apply(tuple, 1)
            # df_diff.apply(tuple, 1).isin(df_existing.apply(tuple, 1))
            # OUTPUT
            INCREMENT_DIM_CUSTOMER = DIM_FILEPATH + "\DIM_CUSTOMER.xlsx"
            NEW_DIM_CUSTOMER.to_excel(INCREMENT_DIM_CUSTOMER, index=False)

            # DIM PRODUCT INCREMENT LOAD
            df_dif2 = DIM_PRODUCT.merge(increment_scan, how='outer')
            df_dif2.reset_index(inplace=True, drop=True)
            cols = ["Product_UID", "Product ID", "Product Name", "Category", "Sub-Category", "Audited_Time"]
            df_dif2 = add_time(df_dif2)
            df_dif2 = df_dif2[cols]

            # OUTPUT
            OUTPUT_2 = DIM_FILEPATH + "\DIM_PRODUCT.xlsx"
            df_dif2.to_excel(OUTPUT_2, index=False)

            # FACT DAILY INCREMENTAL LOAD
            global fact_merger
            fact_merger = FACT_DAILY_STATS.merge(increment_scan, how='outer')
            fact_merger.reset_index(inplace=True, drop=True)
            fact_merger['Transaction_UID'] = fact_merger.index + 1
            cols = ["Transaction_UID", "Order ID", "Order Date", "Ship Date", "Ship Mode",
                    "Customer_UID", "Product_UID", "Sales", "Quantity",
                    "Profit", "Audited_Time"]

            fact_merger['Customer_UID'] = fact_merger['Customer_UID'].fillna(round(fact_merger['Customer_UID'].mean()))
            fact_merger['Product_UID'] = fact_merger['Product_UID'].fillna(round(fact_merger['Product_UID'].mean()))
            fact_merger.drop_duplicates(inplace=True)
            df_fact = add_time(fact_merger)
            df_fact = df_fact[cols]


            # OUPUT
            OUTPUT_3 = FACT_FILEPATH + "\FACT_DAILY_STATS.xlsx"
            df_fact.to_excel(OUTPUT_3, index=False)

            # t = 20
            # print("Time taken " + str(t)+ "s" )
            # time.sleep(t)
            daily_counter += 1
        else:
            print("end of the day increment")
        computeDailyStatistic()
        break

Функция «computeDailyStatistic» вычисляет и извлекает ежедневную статистику продаж из таблицы FACT_DAILY_STATS. Эти статистические данные дают комплексное представление об эффективности продаж с течением времени, способствуя лучшему пониманию и стратегическому планированию. Давайте углубимся во внутреннюю работу этой функции и ее значение для раскрытия информации, основанной на данных.

  1. Подготовка данных. Функция «computeDailyStatistic» начинается с чтения таблицы FACT_DAILY_STATS из ранее обновленного файла Excel, который хранится в «OUTPUT_3». Затем данные копируются, чтобы гарантировать, что любые изменения во время вычислений не повлияют на исходные данные.
  2. Выбор данных о продажах: для расчета ежедневной статистики продаж функция фильтрует данные, чтобы сохранить только столбцы «Дата заказа» и «Продажи». Этот выбор жизненно важен, поскольку он изолирует необходимые данные для вычисления статистики.
  3. Группировка по дате заказа: теперь, когда данные о продажах готовы, функция группирует данные по «дате заказа» и агрегирует значения продаж с помощью функции «сумма». Результатом этого шага является новый фрейм данных «DAILY_STATS», где каждая строка представляет статистику продаж за определенную дату.
  4. Расчет кумулятивной статистики: функция рассчитывает различную кумулятивную статистику на основе ежедневных данных о продажах. Эти статистические данные включают совокупную сумму, среднее, минимальное и максимальное значения продаж до каждой соответствующей даты. Отслеживая эти показатели, аналитики могут получить представление об общей эффективности продаж с течением времени.
  5. Добавление уникальности: чтобы каждая строка в кадре данных «DAILY_STATS» была уникальной, вводится дополнительный столбец с именем «SALES_UID». В этом столбце каждой строке присваивается уникальный идентификатор, что упрощает отслеживание и ссылки.
  6. Экспорт результатов. Последний шаг включает в себя экспорт рассчитанной статистики ежедневных продаж в файл Excel с именем «DAILY_STATS.xlsx». Этот файл сохраняется в папке «04DATAMART», что обеспечивает легкий доступ для дальнейшего анализа и составления отчетов.
#compute the daily statistic of sales and output the data to 04DATAMART folder
def computeDailyStatistic():
    print("Computing Statistics and output to Daily_Sales.xlsx\n")

    daily_stats = pd.read_excel(OUTPUT_3)
    daily_statistics = daily_stats.copy()

    sales_data = daily_statistics.copy()
    keep = ['Order Date', 'Sales']
    sales_data = sales_data.filter(items=keep)
    DAILY_STATS = sales_data.groupby('Order Date').agg({'Sales': sum})
    DAILY_STATS = DAILY_STATS.reset_index()
    DAILY_STATS['SALES_SUM_CUMULATIVE'] = DAILY_STATS['Sales'].cumsum()
    DAILY_STATS['SALES_MEAN_CUMULATIVE'] = DAILY_STATS['Sales'].expanding().mean()
    DAILY_STATS['SALES_MIN_CUMULATIVE'] = DAILY_STATS['Sales'].cummin()
    DAILY_STATS['SALES_MAX_CUMULATIVE'] = DAILY_STATS['Sales'].cummax()

    # additional column
    DAILY_STATS['SALES_UID'] = DAILY_STATS.index + 1

    # THE OUTPUT
    DAILY_STATS_OUTPUT = DATAMART_FILE + "\DAILY_STATS.xlsx"
    DAILY_STATS.to_excel(DAILY_STATS_OUTPUT, index=False)

«Основная» функция служит проводником, организуя весь процесс ETL (извлечение, преобразование, загрузка). Эта функция наблюдает за важнейшими этапами загрузки исходных данных и имитации ежедневной дополнительной загрузки данных. Интегрируя различные компоненты и функции, «основная» функция оптимизирует рабочий процесс ETL, обеспечивая эффективную обработку и интеграцию данных.

def main():
    ## Performing Initial Load
    print("Start of ETL.Performing Initial Load.")
    initialLoad(df, DIM_FILEPATH, FACT_FILEPATH)

    ## Simulating Daily Load by processing file one by one in Incremental
    print("Performing Incremental Data Loading")
    directory = os.fsencode(INCREMENTAL_FILEPATH)

    for file in os.listdir(directory):
        if (~(file.startswith(b"~$")) & file.endswith(b".xlsx")):
            filename = os.path.join(directory, file)
            incrementalLoad(filename)
            # t = 20
            # print("Time taken " + str(t) + "s")
            # time.sleep(t)

    print("End of ETL Program")


if __name__ == '__main__':
    main()

Заключение

В заключение следует отметить, что проект по интеграции и кластеризации данных стал важным мероприятием, которое способствовало принятию обоснованных решений и получению ценных сведений из различных наборов данных. На протяжении всего проекта мы сосредоточились на ключевых аспектах, включая начальную загрузку данных, добавочные обновления данных и вычисление ежедневной статистики.

Во время первоначальной загрузки данных мы создали необходимые таблицы, а именно DIM_CUSTOMER, DIM_PRODUCT и FACT_DAILY_STATS, извлекая и систематизируя данные из каталога «/01Source/Initial». Эти таблицы служат структурированным хранилищем информации о клиентах, сведений о продуктах и ​​ежедневной статистики продаж, закладывая основу для дальнейшего анализа.

Впоследствии процесс добавочной загрузки данных касался обновлений и новых записей в таблицах DIM и FACT. Используя операции слияния, мы гарантировали, что таблицы DIM_CUSTOMER и DIM_PRODUCT будут оставаться актуальными, без проблем внося новую информацию. Кроме того, мы ежедневно обрабатывали файлы данных из каталога «/01Source/Incremental», внося соответствующие изменения в существующие таблицы.

Чтобы улучшить аудит и отслеживание данных, мы представили функцию «add_time», которая фиксирует метку времени загрузки данных, предоставляя ценную информацию о происхождении и эволюции данных.

Проект также включал ежедневную статистику продаж, что является важным шагом в создании значимой бизнес-аналитики. Результирующая таблица DAILY_STATS содержит совокупные показатели продаж, включая общий объем продаж, средние продажи, минимальные продажи и максимальные продажи с течением времени.

Наконец, основной процесс ETL (извлечение, преобразование, загрузка) организовал различные шаги, выполняя начальную загрузку и имитируя ежедневные обновления данных. Этот комплексный подход гарантирует, что данные остаются точными, надежными и способствуют эффективному принятию решений.

В заключение следует отметить, что успешное выполнение проекта по интеграции и кластеризации данных предоставляет компаниям инструменты для использования возможностей данных, что позволяет им принимать обоснованные решения, выявлять закономерности и получать ценную информацию для получения конкурентного преимущества. Поскольку методы работы с данными продолжают формировать ландшафт современных предприятий, надежные процессы интеграции и кластеризации становятся критически важными для использования всего потенциала данных и обеспечения успеха организации.