Я относительно новичок в Python и изо всех сил пытаюсь использовать модуль многопроцессорности для выполнения некоторого преобразования данных с интенсивным использованием ЦП. У меня есть большой кусок данных (~ 400 000 наблюдений ~ 300 переменных) в формате csv, который я хочу преобразовать в извлечение данных таблицы с помощью их Python API. Написание сценария для преобразования несложно, но для его завершения требуется около 15 минут, так как только 1 ЦП выполняет работу (с Tableau Desktop это занимает всего около 90 секунд). Мне нужно использовать все мои 8 ядер, чтобы преобразование прошло быстрее.
Моя первоначальная идея заключалась в том, чтобы разделить данные на 8 частей, заставить по 8 рабочих процессов создавать списки строк таблицы с помощью модуля многопроцессорности, а затем объединить строки в одну таблицу tde. Однако, поскольку объекты / классы строк Tableau определены в отдельном модуле (API tableau), я получаю ошибки травления и указателя. API сложен и основан на ряде других модулей, поэтому все мои попытки восстановить необходимые определения в основном глобальном пространстве потерпели неудачу.
Я пробовал использовать Dill и PiCloud, но обе попытки по-прежнему приводят к ошибкам травления или указателя. Кто-нибудь знает эффективный способ сериализации и / или многопроцессорной обработки вычислений в Python, который полагается на методы / объекты, определенные во внешнем пакете (без необходимости копаться в пакете, чтобы попытаться воссоздать колесо в вашей программе)?
Ниже приведена рабочая программа, которую я хочу использовать в нескольких процессах (я в значительной степени опирался на опубликованную здесь работу Брайана Бикелла http://www.interworks.com/blogs/bbickell/2012/12/06/introduction-python-tableau-data-extract-api-csv-extract-example):
from sys import argv
import os, csv, datetime, time
import dataextract as tde
csv.field_size_limit(10000000)
## Functions
# This function makes adding the columns to each row in the extract a bit easier.
def add_tde_col(colnum, row, val, t):
# Date format used below
dateformat = '%Y-%mm-%dd %H:%M:%S.%f'
if t == tdeTypes['INTEGER']:
try:
convert = int(val)
row.setInteger(colnum, convert)
except ValueError:
#if we bomb the cast then we just add a null
row.setNull(colnum)
elif t == tdeTypes['DOUBLE']:
try:
convert = float(val)
row.setDouble(colnum, convert)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['BOOLEAN']:
try:
convert = int(val)
if convert > -1 and convert <= 1:
row.setBoolean(colnum, convert)
else:
row.setNull(colnum)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['DATETIME']:
try:
d = datetime.datetime.strptime(val, dateformat)
row.setDate(colnum, d.year, d.month, d.day, d.hour, d.minute, d.second, d.microsecond)
except ValueError:
row.setNull(colnum)
elif t == tdeTypes['CHAR_STRING']:
row.setCharString(colnum, val)
elif t == tdeTypes['UNICODE_STRING']:
row.setString(colnum, val)
else:
print 'Error'
row.setNull(colnum)
# define csv input
inputFile = 'test1.csv'
## Parameters
tdeFileName = 'tdetest1.tde'
startTime = time.clock()
# Handy dictionary of Tableau data types
tdeTypes = {'INTEGER': 7, 'DOUBLE': 10, 'BOOLEAN': 11, 'DATE': 12, 'DATETIME': 13, 'DURATION': 14,
'CHAR_STRING': 15, 'UNICODE_STRING': 16}
## Define CSV Schema in dict, (truncated here for brevity)
csvSchema = []
csvSchema.append({'fAsOfDate': tdeTypes['DATETIME']})
csvSchema.append({'AsOfDate_Max': tdeTypes['DATETIME']})
csvSchema.append({'LoanID': tdeTypes['INTEGER']})
csvSchema.append({'lenderdatabaseid': tdeTypes['INTEGER']})
csvSchema.append({'loanrecordid': tdeTypes['INTEGER']})
csvSchema.append({'random_num': tdeTypes['INTEGER']})
# Try to create extract, delete if found.
try:
tdeFile = tde.Extract(tdeFileName)
except:
os.system('del '+tdeFileName)
os.system('del DataExtract.log')
tdeFile = tde.Extract(tdeFileName)
# Open CSV
csvFile = open(inputFile, "rU")
reader = csv.reader(csvFile, delimiter = '^')
print 'Reading records from %s' % (inputFile)
# Create TDE table definition
tdeTableDef = tde.TableDefinition()
print 'Defined table schema:'
# Build TDE Table Def from csv schema dict
for index, item in enumerate(csvSchema):
for k, v in item.items():
print 'Column %i: %s <%s>' % (index, k, tdeTypes.keys() [tdeTypes.values().index(v)])
tdeTableDef.addColumn(k, v)
# Add table to extract
tdeTable = tdeFile.addTable("Extract",tdeTableDef)
print 'Writing records to %s' % (tdeFileName)
# iterate through rows and columns of csv -> add to tde
rownum = 0
for row in reader:
if rownum == 0:
header = row
else:
colnum = 0
tdeRow = tde.Row(tdeTableDef)
for col in row:
if colnum+1 > len(csvSchema):
break
add_tde_col(colnum, tdeRow, row[colnum], csvSchema[colnum].values()[0])
colnum += 1
tdeTable.insert(tdeRow)
tdeRow.close()
rownum += 1
print '%i rows added in total in %f seconds' % (rownum-1, time.clock()-startTime)
tdeFile.close()
csvFile.close()
multiprocessing
python? Если это так,multiprocessing
полагается наcPickle
, и поэтому ни один из двух упомянутых вами сериализаторов не будет использоваться. Если вы используетеpathos.multiprocessing
, который напрямую используетdill
, возможно, вам повезет больше. - person Mike McKerns   schedule 02.07.2014dill
)? Это может помочь кому-то найти для вас путь вперед. - person Mike McKerns   schedule 02.07.2014ctypes.pointers
. Стон.dill
пока не может этого сделать. Пока работают только некоторые типыctypes
, но не типы указателей. - person Mike McKerns   schedule 02.07.2014PiCloud
не может справиться ни с этим, ни с чем-либо еще, о чем я знаю. - person Mike McKerns   schedule 02.07.2014