прочитать CSV и загрузить в ведро gcp с помощью проблемы с компилятором Google Cloud

У меня возникла странная проблема, когда я пытаюсь прочитать csv из ведра gcp и записать в то же ведро. Обратите внимание, что приведенный ниже код раньше работал у меня, но теперь в журналах воздушного потока возникает исключение, в котором говорится:

{models.py:1796} ERROR - Error executing an HTTP request: libcurl code 23 meaning 'Failed writing received data to disk/application', error details: Received 134221820 response bytes for a 134217728-byte buffe
     when reading gs://file_bucket/abc.csv
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py", line 1664, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 103, in execut
    return_value = self.execute_callable(
  File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 108, in execute_callabl
    return self.python_callable(*self.op_args, **self.op_kwargs
  File "/home/airflow/gcs/dags/handle_split_rows.py", line 56, in handle_split_row
    lines= file_stream.read(
  File "/opt/python3.6/lib/python3.6/site-packages/tensorflow/python/lib/io/file_io.py", line 132, in rea
    pywrap_tensorflow.ReadFromStream(self._read_buf, length, status)
  File "/opt/python3.6/lib/python3.6/site-packages/tensorflow/python/framework/errors_impl.py", line 528, in __exit_
    c_api.TF_GetCode(self.status.status)
tensorflow.python.framework.errors_impl.FailedPreconditionError: Error executing an HTTP request: libcurl code 23 meaning 'Failed writing received data to disk/application', error details: Received 134221820 response bytes for a 134217728-byte buffe
     when reading gs://file_bucket/abc.csv

код:

#!/usr/bin/env python
import os
import json
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import python_operator
from airflow.contrib.hooks import gcs_hook
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators import PythonOperator, BranchPythonOperator
from airflow.operators import BashOperator
from lib import notification_utility

default_args = {
    'owner': os.environ["OWNER"],
    'depends_on_past': False,
    'start_date': '2019-10-10 09:31:00'
}

with DAG('parse_bad_rows',
    default_args=default_args,
    catchup=False,
    schedule_interval= None
) as dag:
    def parse_rows(**context):
        import pandas as pd
        import numpy as np
        import csv
        import os
        import gcsfs
        from tensorflow.python.lib.io import file_io
        from pandas.compat import StringIO
        import io
        #**tf.disable_v2_behavior() also tried disabling v1 just in case but i dont think it makes any sense**
        #updated_file_list = context['ti'].xcom_pull(task_ids='list_files_delta_bucket_test')
        fs = gcsfs.GCSFileSystem(project='project_name')
        updated_file_list = fs.ls('/bucket_name/foldername/')
        updated_file_list = [ x for x in updated_file_list if "abc" in x ]
        print("updated_file_list------------------>",updated_file_list)
        for f in updated_file_list:
            print("File Being processed------->",f)
            file_name = os.path.splitext(f)[0]
           #**this is where the job is failing while reading the file so I am assuming it has to do something with tensorflow.python.lib.io import file_io**
            file_stream = file_io.FileIO("gs://"+f, mode='r')
            lines= file_stream.read()
            file_stream_less_cols   =io.StringIO(lines)
            Split_Rows = [x for x in file_stream_less_cols if x.count('|') < 297]
            Split_Rows = ' '.join(map(str, Split_Rows))
            file_stream.close()
            Split_Rows_Stream = pd.DataFrame(io.StringIO(Split_Rows),columns=['BLOB_COLUMN'],dtype='str')
            #Split_Rows_Stream['File_Name'] = Split_Rows_Stream.index
            parse_names = file_name.split('/')
            filename = parse_names[2]
            bucketname  = parse_names[0]
            Split_Rows_Stream['FILE_NAME'] = filename
            print("bucketname------------>",bucketname)
            print("filename------------->",filename)
            Split_Rows_Stream.to_csv("gs://"+bucketname+"/ERROR_FILES/"+filename+".csv",encoding='utf-8',quoting=csv.QUOTE_NONE,escapechar='|')

    Python_Task_Split_Rows = PythonOperator(
                             task_id= 'split_rows_to_process_test',
                             provide_context=True,
                             python_callable=parse_rows,
                             #op_kwargs={'project':'project_name','bucket':'bucket_name','table_name':'abc','delim_num':297},
                             #trigger_rule=TriggerRule.ALL_SUCCESS,
                             dag=dag
                            )
    # Orchestration
    Python_Task_Split_Rows

Я также пробовал то же самое в локальном режиме, чтобы убедиться, что csv не является проблемой.

import pandas as pd
import numpy as np
import csv
import io
import os
#Read the file
directory='c:\\Users\BG/Downloads/file_Cleansing'
for filename in os.listdir(directory):
    file_name = filename.split('.')[0]
    f=open('c:\\Users\BG/Downloads/file_Cleansing/'+filename,'r',encoding="utf8")
    #Readlines forom the text file
    lines= f.read()
    #cleanse the lines
    file_stream   =io.StringIO(lines)
    Split_Rows    = [x for x in file_stream if x.count('|') < 297]
    Split_Rows = ' '.join(map(str, Split_Rows))
    f.close()
    Split_Rows_Stream = pd.DataFrame(io.StringIO(Split_Rows),columns=['blob'])
    Split_Rows_Stream["File_Name"] = file_name
    Split_Rows_Stream.to_csv("c:\\Users\BG/Downloads/file_Cleansed/'+filename+"_error.csv",escapechar='|',encoding='utf-8')

вышеупомянутое сработало, как ожидалось. Моя цель - найти записи, которые не соответствуют количеству разделителей, ожидаемых для строки (в основном мой разделитель - это канал, и в каждой строке ожидается 297 каналов, так как в этом CSV 298 столбцов, но в некоторых строках у меня есть канал между данными .)

и захватите эти записи и загрузите их в csv, а затем в таблицу в bigquery для обратного объединения строк (используя sql lead или lag, поскольку я использую имя файла и номер индекса для упорядочивания и группировки), чтобы восстановить и восстановить столько записей, сколько возможный.

Также, наконец, моя учетная запись службы была изменена, это может быть проблема с разрешением на GCP.

любой совет приветствуется.

Спасибо за уделенное время.


person kumarm    schedule 07.12.2019    source источник


Ответы (2)


Похоже, это проблема, связанная с [разрешениями] [1], убедитесь, что ваша учетная запись службы указана в сегменте permissions, и если у него есть роль для чтения и / или записи

Я воспроизвел ваш сценарий с вашим кодом, чтобы прочитать файл, и он работает правильно

from tensorflow.python.lib.io import file_io
import gcsfs
import os, io

fs = gcsfs.GCSFileSystem(project='project_name')
updated_file_list = fs.ls('bucket')
updated_file_list = [ x for x in updated_file_list if "filename" in x ]
print("updated_file_list------------------>",updated_file_list)

for f in updated_file_list:
    print("File Being processed------->",f)
    file_name = os.path.splitext(f)[0]
    #**this is where the job is failing while reading the file so I am assuming it has to do something with tensorflow.python.lib.io import file_io**
    file_stream = file_io.FileIO("gs://"+f, mode='r')
    lines= file_stream.read()
    print(lines)

ВЫВОД:

('updated_file_list------------------>', [u'bucket/file'])
('File Being processed------->', u'bucket/file')
this is a text from a file
person ebeltran    schedule 09.12.2019
comment
Спасибо за потраченное время, да, скорее всего, это проблема с разрешением, я до сих пор не понял, где нужно установить разрешения. Тем временем я прочитал файл с помощью Pandas, и он сработал. - person kumarm; 10.12.2019

Да, это может быть проблема с разрешением на GCP. Не могли бы вы также проверить журналы GCS, возможно, вы сможете получить там дополнительную информацию об этой проблеме.

person tclass    schedule 08.12.2019