Простой проект на конвейерах ETL в Python
Добро пожаловать в мир интеграции и кластеризации данных! В этом блоге мы углубимся в интригующее задание, демонстрирующее возможности Python в управлении и анализе данных из различных источников. В нашу задачу входит построение таблиц измерений и фактов, выполнение инкрементной загрузки данных и вычисление ежедневной статистики. Давайте рассмотрим, как мы достигаем этих целей шаг за шагом.
_____________________________________________________________
Чтобы начать наше путешествие, мы начнем с начального набора данных, расположенного в каталоге 01SOURCE/INITIAL в Datalake. Этот набор данных под названием «Sales Superstore.xlsx» служит нашей основой. Первая часть нашего задания включает в себя создание таблиц DIM_CUSTOMER, DIM_PRODUCT и FACT_DAILY_STATS на основе этих исходных данных.
Таблица DIM_CUSTOMER: таблица DIM_CUSTOMER содержит важную информацию о клиенте, такую как идентификатор клиента и имя клиента. Используя библиотеку pandas в Python, мы фильтруем и удаляем дубликаты из исходного набора данных, чтобы создать таблицу DIM_CUSTOMER. Для обеспечения уникальности мы присваиваем каждому клиенту уникальный идентификатор (Customer_UID).
Таблица DIM_PRODUCT. Подобно таблице DIM_CUSTOMER, в таблице DIM_PRODUCT хранятся данные, связанные с продуктом, включая идентификатор продукта, название продукта, категорию и подкатегорию. Опять же, мы удаляем дубликаты и присваиваем каждому продукту уникальный Product_UID.
Таблица FACT_DAILY_STATS. Таблица FACT_DAILY_STATS служит хранилищем ежедневной статистики продаж. Мы объединяем таблицы DIM_CUSTOMER и DIM_PRODUCT с исходным набором данных на основе общих ключей (Customer ID и Product ID соответственно). Этот процесс слияния позволяет нам получить исчерпывающий набор данных с подробной информацией о транзакциях. С помощью Transaction_UID, назначенного каждой записи, мы собираем важные сведения, такие как идентификатор заказа, дата заказа, дата отгрузки, режим отгрузки, продажи, количество и прибыль.
import pandas as pd import os import glob import time # To change the Datalake starting directory accordingly GLOBAL_START_FILEPATH = r"C:\Users\matth\Documents\DATALAKE" SOURCE_FILEPATH = GLOBAL_START_FILEPATH + r"\01SOURCE\INITIAL" INCREMENTAL_FILEPATH = GLOBAL_START_FILEPATH + r"\01SOURCE\INCREMENTAL" SOURCE = SOURCE_FILEPATH + "\Sales Superstore.xlsx" df = pd.read_excel(SOURCE) DIM_FILEPATH = GLOBAL_START_FILEPATH + r"\02DIM" FACT_FILEPATH = GLOBAL_START_FILEPATH + r"\03FACT" DATAMART_FILE = GLOBAL_START_FILEPATH + r"\04DATAMART"
- Отслеживание Created_Time: в основе функции «add_time» лежит возможность отслеживать «Created_Time». Используя библиотеку pandas, он создает новый столбец с именем «Created_Time» и заполняет его текущей отметкой времени с помощью функции «pd.Timestamp(‘now’)». Эта отметка времени отмечает точный момент начала процесса загрузки данных, фиксируя точное время начала интеграции данных.
- Генерация Audited_Time: Затем функция генерирует «Audited_Time», который является форматированным представлением «Created_Time» для целей аудита. Для этого он использует функцию «pd.to_datetime()» pandas для преобразования столбца «Created_Time» в формат даты и времени. Впоследствии параметр «format=’%H:%M’» используется для указания желаемого формата времени в виде часов и минут (ЧЧ:ММ). Результатом стал новый столбец с именем «Audited_Time», обеспечивающий четкое и удобочитаемое представление времени начала интеграции данных.
- Повышение целостности данных: для дальнейшего обеспечения целостности данных столбец «Created_Time» удаляется из DataFrame с помощью «del sd[‘Created_Time’]». Этот шаг устраняет любую путаницу и предотвращает избыточность в наборе данных, оставляя после себя только ценную информацию, включая «Audited_Time.
## function to track the actual time of initial loading and increment loading for auditing def add_time(sd): sd['Created_Time'] = pd.Timestamp("now") # sd['new_date'] = pd.to_datetime(sd['Created_Time'], format='%Y-%m-%d').dt.date sd['Audited_Time'] = pd.to_datetime(sd['Created_Time'], format='%H:%M').dt.time del sd['Created_Time'] return sd
Функция «initialLoad». Функция «initialLoad» служит краеугольным камнем нашего путешествия по интеграции данных, эффективно создавая таблицы DIM и FACT на основе исходного источника данных.
- Построение DIM_CUSTOMER: функция «initialLoad» начинается с построения таблицы DIM_CUSTOMER. Он идентифицирует и выбирает соответствующие столбцы («Идентификатор клиента» и «Имя клиента») из исходного источника данных («df»). Используя функцию «фильтра» и удаляя все повторяющиеся строки, он гарантирует, что таблица DIM_CUSTOMER содержит уникальные данные о клиентах. «Customer_UID» присваивается последовательно каждой записи клиента, устанавливая уникальный идентификатор для дальнейшего использования. Впоследствии таблица DIM_CUSTOMER сохраняется в виде файла Excel, что облегчает дальнейшую обработку и анализ данных.
- Создание DIM_PRODUCT. Следуя той же методике, функция переходит к созданию таблицы DIM_PRODUCT. Здесь соответствующие столбцы («Идентификатор продукта», «Название продукта», «Категория» и «Подкатегория») фильтруются, а дубликаты удаляются. Подобно DIM_CUSTOMER, каждой записи продукта присваивается уникальный «Product_UID», что упрощает последующую обработку данных. Таблица DIM_PRODUCT также сохраняется в виде файла Excel, готового к включению в рабочий процесс интеграции данных.
- Генерация FACT_DAILY_STATS: следующей на очереди является таблица FACT_DAILY_STATS, которая представляет собой кульминацию первоначальных усилий по интеграции данных. Используя информацию из DIM_CUSTOMER и DIM_PRODUCT, функция «initialLoad» объединяет исходный набор данных с этими измерениями для создания комплексной таблицы FACT_DAILY_STATS. Он эффективно объединяет различные поля, такие как «Идентификатор заказа», «Дата заказа», «Дата отгрузки», «Режим доставки», «Customer_UID», «Product_UID», «Продажи», «Количество» и «Прибыль» в одном , сплоченный стол. Кроме того, каждой транзакции присваивается уникальный «Transaction_UID», что облегчает эффективное отслеживание данных. В качестве последнего шага таблица FACT_DAILY_STATS сохраняется в виде файла Excel, что завершает первоначальный процесс интеграции данных.
def initialLoad(df, DIM_FILEPATH, FACT_FILEPATH): print("Constructing DIM and FACT Tables based on /01Source/Initial") #DIM_CUSTOMER keep = ["Customer ID", "Customer Name"] global DIM_CUSTOMER DIM_CUSTOMER = df.filter(items=keep) DIM_CUSTOMER.drop_duplicates(inplace=True) DIM_CUSTOMER.reset_index(inplace=True, drop=True) DIM_CUSTOMER['Customer_UID'] = DIM_CUSTOMER.index + 1 columns = ["Customer_UID", "Customer ID", "Customer Name"] DIM_CUSTOMER = DIM_CUSTOMER[columns] # DIM_CUSTOMER OUTPUT OUTPUT = DIM_FILEPATH + "\DIM_CUSTOMER.xlsx" DIM_CUSTOMER.to_excel(OUTPUT, index=False) # DIM_PRODUCT keep = ["Product ID", "Product Name", "Category", "Sub-Category"] global DIM_PRODUCT DIM_PRODUCT = df.filter(items=keep) DIM_PRODUCT.drop_duplicates(inplace=True) DIM_PRODUCT.reset_index(inplace=True, drop=True) DIM_PRODUCT['Product_UID'] = DIM_PRODUCT.index + 1 temp_cols = DIM_PRODUCT.columns.tolist() new_cols = temp_cols[-1:] + temp_cols[:-1] DIM_PRODUCT = DIM_PRODUCT[new_cols] # DIM_PRODUCT OUTPUT OUTPUT_2 = DIM_FILEPATH + "\DIM_PRODUCT.xlsx" DIM_PRODUCT.to_excel(OUTPUT_2, index=False) # FACT_DAILY_STATS df_copy = df.copy() customer_merger = df_copy.merge(DIM_CUSTOMER, how='inner', on=['Customer ID'], suffixes=('_left', '_right')) complete_merger = customer_merger.merge(DIM_PRODUCT, how='inner', on=['Product ID'], suffixes=('_left', '_right')) drop_fields = ['Customer ID', 'Customer Name_left', 'Product ID', 'Category_left', 'Sub-Category_left', 'Product Name_left', 'Customer Name_right', 'Product Name_right', 'Sub-Category_right', 'Category_right'] complete_merger.drop(labels=drop_fields, axis=1, inplace=True) complete_merger.drop_duplicates(inplace=True) complete_merger.reset_index(inplace=True, drop=True) complete_merger['Transaction_UID'] = complete_merger.index + 1 cols = ["Transaction_UID", "Order ID", "Order Date", "Ship Date", "Ship Mode", "Customer_UID", "Product_UID", "Sales", "Quantity", "Profit"] global FACT_DAILY_STATS FACT_DAILY_STATS = complete_merger[cols] FACT_DAILY_STATS.reset_index(inplace=True, drop=True) FACT_DAILY_STATS['Transaction_UID'] = FACT_DAILY_STATS.index + 1 # FACT_DAILY_STATS OUTPUT global OUTPUT_3 OUTPUT_3 = FACT_FILEPATH + "\FACT_DAILY_STATS.xlsx" FACT_DAILY_STATS.to_excel(OUTPUT_3, index=False) #getting data from the incremental files to insert new rows / update the existing row in DIM and FACT tables #updates by replacing records with new values from incremental files, insert new rows if records are absent from source
Добавочная загрузка данных: построение на фундаменте
Загрузка исходных данных закладывает основу для всестороннего анализа. Однако на этом задача не заканчивается; данные постоянно развиваются и обновляются, что требует постепенного процесса загрузки данных для поддержания актуальности и актуальности информации. Функция «incrementalLoad» играет ключевую роль в этом процессе, эффективно обновляя таблицы DIM и FACT на основе файлов добавочных данных. Давайте рассмотрим, как эта функция обеспечивает плавную добавочную интеграцию данных.
- Выбор файла данных: функция «incrementalLoad» начинается с выбора файлов из каталога «INCREMENTAL_FILEPATH». Эти файлы представляют собой добавочные обновления данных, регулярно получаемые для поддержания актуальности и актуальности данных. Используя функцию «glob.glob», функция захватывает все файлы Excel в указанном каталоге.
- Интеграция инкрементных данных. В основе функции «incrementalLoad» лежит процесс интеграции инкрементных данных. Он начинается с сканирования каждого файла один за другим для определения ежедневных приращений. На основе ежедневного счетчика функция обрабатывает каждый файл и соответствующим образом обновляет таблицы DIM и FACT.
- DIM CUSTOMER INCREMENT LOAD: для инкрементного обновления таблицы DIM_CUSTOMER функция выполняет «внешнее слияние» между существующим DIM_CUSTOMER и инкрементными данными из выбранного файла. Результатом слияния является полный набор данных о клиентах, как существующих, так и вновь добавленных. «Customer_UID» переиндексируется для сохранения уникальности, а обновленная таблица DIM_CUSTOMER сохраняется в виде файла Excel.
- DIM PRODUCT INCREMENT LOAD: функция также постепенно обновляет таблицу DIM_PRODUCT. Он объединяет существующий DIM_PRODUCT с добавочными данными и пересчитывает «Product_UID» для сохранения уникальности. Кроме того, функция «add_time» используется для отслеживания «Audited_Time» этого обновления. Обновленная таблица DIM_PRODUCT затем сохраняется в виде файла Excel.
- ФАКТИЧЕСКАЯ ЕЖЕДНЕВНАЯ ИНКРЕМЕНТНАЯ НАГРУЗКА. Точно так же таблица FACT_DAILY_STATS подвергается инкрементным обновлениям. Функция выполняет «внешнее слияние» между существующим FACT_DAILY_STATS и добавочными данными. «Transaction_UID» переиндексируется для обеспечения уникальности, а отсутствующие «Customer_UID» и «Product_UID» заменяются средними значениями. Как и в случае с DIM_PRODUCT, функция «add_time» отслеживает «Audited_Time» этого обновления. Обновленная таблица FACT_DAILY_STATS сохраняется в виде файла Excel.
- Вычисление ежедневной статистики: после каждого добавочного обновления функция вызывает функцию «computeDailyStatistic» для вычисления ежедневной статистики продаж на основе обновленной таблицы FACT_DAILY_STATS. Эта ежедневная статистика сохраняется в каталоге DATAMART_FILE в виде файла Excel.
def incrementalLoad(file): print("Performing Incremental Data File based on File provided:", file) filename = INCREMENTAL_FILEPATH files = glob.glob(filename + '\*.xlsx') daily_counter = 1 for increment in files: if increment[-6] == str(daily_counter): # print(f'Increment day {daily_counter} ') increment_scan = pd.read_excel(increment) # DIM CUSTOMER INCREMENT LOAD : OUTER MERGE INCREMENTAL LOAD WITH DIM CUSTOMER TABLE increment_customer = DIM_CUSTOMER.copy() customer_merger = increment_customer.merge(increment_scan, how='outer') customer_merger.reset_index(inplace=True, drop=True) cols = ["Customer_UID", "Customer ID", "Customer Name"] NEW_DIM_CUSTOMER = customer_merger[cols] # df_existing.apply(tuple, 1) # df_diff.apply(tuple, 1).isin(df_existing.apply(tuple, 1)) # OUTPUT INCREMENT_DIM_CUSTOMER = DIM_FILEPATH + "\DIM_CUSTOMER.xlsx" NEW_DIM_CUSTOMER.to_excel(INCREMENT_DIM_CUSTOMER, index=False) # DIM PRODUCT INCREMENT LOAD df_dif2 = DIM_PRODUCT.merge(increment_scan, how='outer') df_dif2.reset_index(inplace=True, drop=True) cols = ["Product_UID", "Product ID", "Product Name", "Category", "Sub-Category", "Audited_Time"] df_dif2 = add_time(df_dif2) df_dif2 = df_dif2[cols] # OUTPUT OUTPUT_2 = DIM_FILEPATH + "\DIM_PRODUCT.xlsx" df_dif2.to_excel(OUTPUT_2, index=False) # FACT DAILY INCREMENTAL LOAD global fact_merger fact_merger = FACT_DAILY_STATS.merge(increment_scan, how='outer') fact_merger.reset_index(inplace=True, drop=True) fact_merger['Transaction_UID'] = fact_merger.index + 1 cols = ["Transaction_UID", "Order ID", "Order Date", "Ship Date", "Ship Mode", "Customer_UID", "Product_UID", "Sales", "Quantity", "Profit", "Audited_Time"] fact_merger['Customer_UID'] = fact_merger['Customer_UID'].fillna(round(fact_merger['Customer_UID'].mean())) fact_merger['Product_UID'] = fact_merger['Product_UID'].fillna(round(fact_merger['Product_UID'].mean())) fact_merger.drop_duplicates(inplace=True) df_fact = add_time(fact_merger) df_fact = df_fact[cols] # OUPUT OUTPUT_3 = FACT_FILEPATH + "\FACT_DAILY_STATS.xlsx" df_fact.to_excel(OUTPUT_3, index=False) # t = 20 # print("Time taken " + str(t)+ "s" ) # time.sleep(t) daily_counter += 1 else: print("end of the day increment") computeDailyStatistic() break
Функция «computeDailyStatistic» вычисляет и извлекает ежедневную статистику продаж из таблицы FACT_DAILY_STATS. Эти статистические данные дают комплексное представление об эффективности продаж с течением времени, способствуя лучшему пониманию и стратегическому планированию. Давайте углубимся во внутреннюю работу этой функции и ее значение для раскрытия информации, основанной на данных.
- Подготовка данных. Функция «computeDailyStatistic» начинается с чтения таблицы FACT_DAILY_STATS из ранее обновленного файла Excel, который хранится в «OUTPUT_3». Затем данные копируются, чтобы гарантировать, что любые изменения во время вычислений не повлияют на исходные данные.
- Выбор данных о продажах: для расчета ежедневной статистики продаж функция фильтрует данные, чтобы сохранить только столбцы «Дата заказа» и «Продажи». Этот выбор жизненно важен, поскольку он изолирует необходимые данные для вычисления статистики.
- Группировка по дате заказа: теперь, когда данные о продажах готовы, функция группирует данные по «дате заказа» и агрегирует значения продаж с помощью функции «сумма». Результатом этого шага является новый фрейм данных «DAILY_STATS», где каждая строка представляет статистику продаж за определенную дату.
- Расчет кумулятивной статистики: функция рассчитывает различную кумулятивную статистику на основе ежедневных данных о продажах. Эти статистические данные включают совокупную сумму, среднее, минимальное и максимальное значения продаж до каждой соответствующей даты. Отслеживая эти показатели, аналитики могут получить представление об общей эффективности продаж с течением времени.
- Добавление уникальности: чтобы каждая строка в кадре данных «DAILY_STATS» была уникальной, вводится дополнительный столбец с именем «SALES_UID». В этом столбце каждой строке присваивается уникальный идентификатор, что упрощает отслеживание и ссылки.
- Экспорт результатов. Последний шаг включает в себя экспорт рассчитанной статистики ежедневных продаж в файл Excel с именем «DAILY_STATS.xlsx». Этот файл сохраняется в папке «04DATAMART», что обеспечивает легкий доступ для дальнейшего анализа и составления отчетов.
#compute the daily statistic of sales and output the data to 04DATAMART folder def computeDailyStatistic(): print("Computing Statistics and output to Daily_Sales.xlsx\n") daily_stats = pd.read_excel(OUTPUT_3) daily_statistics = daily_stats.copy() sales_data = daily_statistics.copy() keep = ['Order Date', 'Sales'] sales_data = sales_data.filter(items=keep) DAILY_STATS = sales_data.groupby('Order Date').agg({'Sales': sum}) DAILY_STATS = DAILY_STATS.reset_index() DAILY_STATS['SALES_SUM_CUMULATIVE'] = DAILY_STATS['Sales'].cumsum() DAILY_STATS['SALES_MEAN_CUMULATIVE'] = DAILY_STATS['Sales'].expanding().mean() DAILY_STATS['SALES_MIN_CUMULATIVE'] = DAILY_STATS['Sales'].cummin() DAILY_STATS['SALES_MAX_CUMULATIVE'] = DAILY_STATS['Sales'].cummax() # additional column DAILY_STATS['SALES_UID'] = DAILY_STATS.index + 1 # THE OUTPUT DAILY_STATS_OUTPUT = DATAMART_FILE + "\DAILY_STATS.xlsx" DAILY_STATS.to_excel(DAILY_STATS_OUTPUT, index=False)
«Основная» функция служит проводником, организуя весь процесс ETL (извлечение, преобразование, загрузка). Эта функция наблюдает за важнейшими этапами загрузки исходных данных и имитации ежедневной дополнительной загрузки данных. Интегрируя различные компоненты и функции, «основная» функция оптимизирует рабочий процесс ETL, обеспечивая эффективную обработку и интеграцию данных.
def main(): ## Performing Initial Load print("Start of ETL.Performing Initial Load.") initialLoad(df, DIM_FILEPATH, FACT_FILEPATH) ## Simulating Daily Load by processing file one by one in Incremental print("Performing Incremental Data Loading") directory = os.fsencode(INCREMENTAL_FILEPATH) for file in os.listdir(directory): if (~(file.startswith(b"~$")) & file.endswith(b".xlsx")): filename = os.path.join(directory, file) incrementalLoad(filename) # t = 20 # print("Time taken " + str(t) + "s") # time.sleep(t) print("End of ETL Program") if __name__ == '__main__': main()
Заключение
В заключение следует отметить, что проект по интеграции и кластеризации данных стал важным мероприятием, которое способствовало принятию обоснованных решений и получению ценных сведений из различных наборов данных. На протяжении всего проекта мы сосредоточились на ключевых аспектах, включая начальную загрузку данных, добавочные обновления данных и вычисление ежедневной статистики.
Во время первоначальной загрузки данных мы создали необходимые таблицы, а именно DIM_CUSTOMER, DIM_PRODUCT и FACT_DAILY_STATS, извлекая и систематизируя данные из каталога «/01Source/Initial». Эти таблицы служат структурированным хранилищем информации о клиентах, сведений о продуктах и ежедневной статистики продаж, закладывая основу для дальнейшего анализа.
Впоследствии процесс добавочной загрузки данных касался обновлений и новых записей в таблицах DIM и FACT. Используя операции слияния, мы гарантировали, что таблицы DIM_CUSTOMER и DIM_PRODUCT будут оставаться актуальными, без проблем внося новую информацию. Кроме того, мы ежедневно обрабатывали файлы данных из каталога «/01Source/Incremental», внося соответствующие изменения в существующие таблицы.
Чтобы улучшить аудит и отслеживание данных, мы представили функцию «add_time», которая фиксирует метку времени загрузки данных, предоставляя ценную информацию о происхождении и эволюции данных.
Проект также включал ежедневную статистику продаж, что является важным шагом в создании значимой бизнес-аналитики. Результирующая таблица DAILY_STATS содержит совокупные показатели продаж, включая общий объем продаж, средние продажи, минимальные продажи и максимальные продажи с течением времени.
Наконец, основной процесс ETL (извлечение, преобразование, загрузка) организовал различные шаги, выполняя начальную загрузку и имитируя ежедневные обновления данных. Этот комплексный подход гарантирует, что данные остаются точными, надежными и способствуют эффективному принятию решений.
В заключение следует отметить, что успешное выполнение проекта по интеграции и кластеризации данных предоставляет компаниям инструменты для использования возможностей данных, что позволяет им принимать обоснованные решения, выявлять закономерности и получать ценную информацию для получения конкурентного преимущества. Поскольку методы работы с данными продолжают формировать ландшафт современных предприятий, надежные процессы интеграции и кластеризации становятся критически важными для использования всего потенциала данных и обеспечения успеха организации.