У меня возникла странная проблема, когда я пытаюсь прочитать csv из ведра gcp и записать в то же ведро. Обратите внимание, что приведенный ниже код раньше работал у меня, но теперь в журналах воздушного потока возникает исключение, в котором говорится:
{models.py:1796} ERROR - Error executing an HTTP request: libcurl code 23 meaning 'Failed writing received data to disk/application', error details: Received 134221820 response bytes for a 134217728-byte buffe
when reading gs://file_bucket/abc.csv
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 103, in execut
return_value = self.execute_callable(
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 108, in execute_callabl
return self.python_callable(*self.op_args, **self.op_kwargs
File "/home/airflow/gcs/dags/handle_split_rows.py", line 56, in handle_split_row
lines= file_stream.read(
File "/opt/python3.6/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 132, in rea
pywrap_tensorflow.ReadFromStream(self._read_buf, length, status)
File "/opt/python3.6/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line 528, in __exit_
c_api.TF_GetCode(self.status.status)
tensorflow.python.framework.errors_impl.FailedPreconditionError: Error executing an HTTP request: libcurl code 23 meaning 'Failed writing received data to disk/application', error details: Received 134221820 response bytes for a 134217728-byte buffe
when reading gs://file_bucket/abc.csv
код:
#!/usr/bin/env python import os import json from datetime import datetime, timedelta from airflow import DAG from airflow.models import Variable from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator from airflow.utils.trigger_rule import TriggerRule from airflow.operators import python_operator from airflow.contrib.hooks import gcs_hook from airflow.contrib.operators.bigquery_operator import BigQueryOperator from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator from airflow.operators.sensors import ExternalTaskSensor from airflow.operators import PythonOperator, BranchPythonOperator from airflow.operators import BashOperator from lib import notification_utility default_args = { 'owner': os.environ["OWNER"], 'depends_on_past': False, 'start_date': '2019-10-10 09:31:00' } with DAG('parse_bad_rows', default_args=default_args, catchup=False, schedule_interval= None ) as dag: def parse_rows(**context): import pandas as pd import numpy as np import csv import os import gcsfs from tensorflow.python.lib.io import file_io from pandas.compat import StringIO import io #**tf.disable_v2_behavior() also tried disabling v1 just in case but i dont think it makes any sense** #updated_file_list = context['ti'].xcom_pull(task_ids='list_files_delta_bucket_test') fs = gcsfs.GCSFileSystem(project='project_name') updated_file_list = fs.ls('/bucket_name/foldername/') updated_file_list = [ x for x in updated_file_list if "abc" in x ] print("updated_file_list------------------>",updated_file_list) for f in updated_file_list: print("File Being processed------->",f) file_name = os.path.splitext(f)[0] #**this is where the job is failing while reading the file so I am assuming it has to do something with tensorflow.python.lib.io import file_io** file_stream = file_io.FileIO("gs://"+f, mode='r') lines= file_stream.read() file_stream_less_cols =io.StringIO(lines) Split_Rows = [x for x in file_stream_less_cols if x.count('|') < 297] Split_Rows = ' '.join(map(str, Split_Rows)) file_stream.close() Split_Rows_Stream = pd.DataFrame(io.StringIO(Split_Rows),columns=['BLOB_COLUMN'],dtype='str') #Split_Rows_Stream['File_Name'] = Split_Rows_Stream.index parse_names = file_name.split('/') filename = parse_names[2] bucketname = parse_names[0] Split_Rows_Stream['FILE_NAME'] = filename print("bucketname------------>",bucketname) print("filename------------->",filename) Split_Rows_Stream.to_csv("gs://"+bucketname+"/ERROR_FILES/"+filename+".csv",encoding='utf-8',quoting=csv.QUOTE_NONE,escapechar='|') Python_Task_Split_Rows = PythonOperator( task_id= 'split_rows_to_process_test', provide_context=True, python_callable=parse_rows, #op_kwargs={'project':'project_name','bucket':'bucket_name','table_name':'abc','delim_num':297}, #trigger_rule=TriggerRule.ALL_SUCCESS, dag=dag ) # Orchestration Python_Task_Split_Rows
Я также пробовал то же самое в локальном режиме, чтобы убедиться, что csv не является проблемой.
import pandas as pd
import numpy as np
import csv
import io
import os
#Read the file
directory='c:\\Users\BG/Downloads/file_Cleansing'
for filename in os.listdir(directory):
file_name = filename.split('.')[0]
f=open('c:\\Users\BG/Downloads/file_Cleansing/'+filename,'r',encoding="utf8")
#Readlines forom the text file
lines= f.read()
#cleanse the lines
file_stream =io.StringIO(lines)
Split_Rows = [x for x in file_stream if x.count('|') < 297]
Split_Rows = ' '.join(map(str, Split_Rows))
f.close()
Split_Rows_Stream = pd.DataFrame(io.StringIO(Split_Rows),columns=['blob'])
Split_Rows_Stream["File_Name"] = file_name
Split_Rows_Stream.to_csv("c:\\Users\BG/Downloads/file_Cleansed/'+filename+"_error.csv",escapechar='|',encoding='utf-8')
вышеупомянутое сработало, как ожидалось. Моя цель - найти записи, которые не соответствуют количеству разделителей, ожидаемых для строки (в основном мой разделитель - это канал, и в каждой строке ожидается 297 каналов, так как в этом CSV 298 столбцов, но в некоторых строках у меня есть канал между данными .)
и захватите эти записи и загрузите их в csv, а затем в таблицу в bigquery для обратного объединения строк (используя sql lead или lag, поскольку я использую имя файла и номер индекса для упорядочивания и группировки), чтобы восстановить и восстановить столько записей, сколько возможный.
Также, наконец, моя учетная запись службы была изменена, это может быть проблема с разрешением на GCP.
любой совет приветствуется.
Спасибо за уделенное время.