Проблемы с Python pickling / multiprocessing и Tableau Data Extract API

Я относительно новичок в Python и изо всех сил пытаюсь использовать модуль многопроцессорности для выполнения некоторого преобразования данных с интенсивным использованием ЦП. У меня есть большой кусок данных (~ 400 000 наблюдений ~ 300 переменных) в формате csv, который я хочу преобразовать в извлечение данных таблицы с помощью их Python API. Написание сценария для преобразования несложно, но для его завершения требуется около 15 минут, так как только 1 ЦП выполняет работу (с Tableau Desktop это занимает всего около 90 секунд). Мне нужно использовать все мои 8 ядер, чтобы преобразование прошло быстрее.

Моя первоначальная идея заключалась в том, чтобы разделить данные на 8 частей, заставить по 8 рабочих процессов создавать списки строк таблицы с помощью модуля многопроцессорности, а затем объединить строки в одну таблицу tde. Однако, поскольку объекты / классы строк Tableau определены в отдельном модуле (API tableau), я получаю ошибки травления и указателя. API сложен и основан на ряде других модулей, поэтому все мои попытки восстановить необходимые определения в основном глобальном пространстве потерпели неудачу.

Я пробовал использовать Dill и PiCloud, но обе попытки по-прежнему приводят к ошибкам травления или указателя. Кто-нибудь знает эффективный способ сериализации и / или многопроцессорной обработки вычислений в Python, который полагается на методы / объекты, определенные во внешнем пакете (без необходимости копаться в пакете, чтобы попытаться воссоздать колесо в вашей программе)?

Ниже приведена рабочая программа, которую я хочу использовать в нескольких процессах (я в значительной степени опирался на опубликованную здесь работу Брайана Бикелла http://www.interworks.com/blogs/bbickell/2012/12/06/introduction-python-tableau-data-extract-api-csv-extract-example):

from sys import argv
import os, csv, datetime, time
import dataextract as tde

csv.field_size_limit(10000000)

## Functions

# This function makes adding the columns to each row in the extract a bit easier.
def add_tde_col(colnum, row, val, t):
    # Date format used below
    dateformat = '%Y-%mm-%dd %H:%M:%S.%f'  

    if t == tdeTypes['INTEGER']:
        try: 
            convert = int(val)
            row.setInteger(colnum, convert)
        except ValueError:
            #if we bomb the cast then we just add a null 
            row.setNull(colnum)

    elif t == tdeTypes['DOUBLE']:
        try: 
            convert = float(val)
            row.setDouble(colnum, convert)
        except ValueError:                        
            row.setNull(colnum)

    elif t == tdeTypes['BOOLEAN']:
        try: 
            convert = int(val)
            if convert > -1 and convert <= 1:
                row.setBoolean(colnum, convert)
        else:
            row.setNull(colnum)
        except ValueError:
            row.setNull(colnum)

    elif t == tdeTypes['DATETIME']:
        try:
            d = datetime.datetime.strptime(val, dateformat)
           row.setDate(colnum, d.year, d.month, d.day, d.hour, d.minute, d.second, d.microsecond)
        except ValueError:
            row.setNull(colnum)

    elif t == tdeTypes['CHAR_STRING']:
        row.setCharString(colnum, val)

    elif t == tdeTypes['UNICODE_STRING']:
        row.setString(colnum, val)

    else:
        print 'Error'
        row.setNull(colnum)

# define csv input      
inputFile = 'test1.csv'

## Parameters 
tdeFileName = 'tdetest1.tde'

startTime = time.clock()

# Handy dictionary of Tableau data types
tdeTypes = {'INTEGER': 7, 'DOUBLE': 10, 'BOOLEAN': 11, 'DATE': 12, 'DATETIME': 13, 'DURATION': 14, 
            'CHAR_STRING': 15, 'UNICODE_STRING': 16}

## Define CSV Schema in dict, (truncated here for brevity)
csvSchema = []
csvSchema.append({'fAsOfDate': tdeTypes['DATETIME']})
csvSchema.append({'AsOfDate_Max': tdeTypes['DATETIME']})
csvSchema.append({'LoanID': tdeTypes['INTEGER']})
csvSchema.append({'lenderdatabaseid': tdeTypes['INTEGER']})
csvSchema.append({'loanrecordid': tdeTypes['INTEGER']})
csvSchema.append({'random_num': tdeTypes['INTEGER']})


# Try to create extract, delete if found.
try:
    tdeFile = tde.Extract(tdeFileName)
except: 
    os.system('del '+tdeFileName)
    os.system('del DataExtract.log')
    tdeFile = tde.Extract(tdeFileName)

# Open CSV
csvFile = open(inputFile, "rU")
reader = csv.reader(csvFile, delimiter = '^')

print 'Reading records from %s' % (inputFile)

# Create TDE table definition
tdeTableDef = tde.TableDefinition() 

print 'Defined table schema:'

# Build TDE Table Def from csv schema dict
for index, item in enumerate(csvSchema):
    for k, v in item.items():
        print 'Column %i: %s <%s>' % (index, k, tdeTypes.keys() [tdeTypes.values().index(v)])
        tdeTableDef.addColumn(k, v)

# Add table to extract
tdeTable = tdeFile.addTable("Extract",tdeTableDef)

print 'Writing records to %s' % (tdeFileName)

# iterate through rows and columns of csv -> add to tde
rownum = 0
for row in reader:
    if rownum == 0:
        header = row
    else:
        colnum = 0
        tdeRow = tde.Row(tdeTableDef)
        for col in row:
            if colnum+1 > len(csvSchema):
                break
            add_tde_col(colnum, tdeRow, row[colnum], csvSchema[colnum].values()[0])
            colnum += 1
        tdeTable.insert(tdeRow)
        tdeRow.close()        
    rownum += 1

print '%i rows added in total in %f seconds' % (rownum-1, time.clock()-startTime)

tdeFile.close()

csvFile.close()

person user3797878    schedule 02.07.2014    source источник
comment
Вы использовали _1 _ / _ 2_ и модуль multiprocessing python? Если это так, multiprocessing полагается на cPickle, и поэтому ни один из двух упомянутых вами сериализаторов не будет использоваться. Если вы используете pathos.multiprocessing, который напрямую использует dill, возможно, вам повезет больше.   -  person Mike McKerns    schedule 02.07.2014
comment
Не могли бы вы также показать ошибку, которую вы получаете (при использовании dill)? Это может помочь кому-то найти для вас путь вперед.   -  person Mike McKerns    schedule 02.07.2014
comment
Спасибо за быстрый ответ, Майк, и за отличную работу с укропом.   -  person user3797878    schedule 02.07.2014
comment
Когда я попытался использовать укроп, я вырезал часть кода, в которой строки добавляются в таблицу tableau, и вместо этого добавил каждый объект открытой строки в список. Затем я попытался использовать укроп для сохранения состояния (думая, что могу сделать это с несколькими сценариями, выполняющимися одновременно, и получить состояние для всех из них в другом сценарии). Это привело к следующей ошибке: ValueError: ctypes объекты, содержащие указатели нельзя мариновать   -  person user3797878    schedule 02.07.2014
comment
О… ctypes.pointers. Стон. dill пока не может этого сделать. Пока работают только некоторые типы ctypes, но не типы указателей.   -  person Mike McKerns    schedule 02.07.2014
comment
PiCloud не может справиться ни с этим, ни с чем-либо еще, о чем я знаю.   -  person Mike McKerns    schedule 02.07.2014
comment
Спасибо, Майк. Я напишу здесь, если найду что-нибудь, что может обойти проблему травления указателей, иначе я буду искать Java или C / C ++   -  person user3797878    schedule 02.07.2014


Ответы (1)


Если вам не удается сериализовать тип ctypes.pointer с dill и PiCloud, я думаю, вы застряли. Я не знаю сериализатора, который мог бы обрабатывать эти типы. dill может обрабатывать некоторые из ctypes типов, но не типы указателей. Я предлагаю добавить в github проблему для dill, и, возможно, произойдет чудо, и вы получите новый сериализованный тип. Имея в руках новый шрифт, я бы использовал pathos.multiprocessing, и он должен работать. Однако до тех пор вам, возможно, придется переписать, чтобы обойти проблему сериализации. Например, если вы используете from dill.detect import badobjects, baditems, badtypes, errors, вы можете увидеть, на какой глубине вам нужно выполнить перезапись. Это может быть так же просто, как изменить способ импорта чего-либо, однако, поскольку есть ctypes.pointer, я скептически отношусь к тому, что это будет легко.

person Mike McKerns    schedule 02.07.2014