Я все еще новичок в pyspark, и я пытаюсь оценить функцию и итеративно создавать столбцы с помощью UDF. Ниже приведена функция:
def get_temp(df):
l=['temp1','temp2','temp3']
s=[0]
pt = [0]
start = [0]
end = [0]
cummulative_stat = [0]
for p in xrange(1,4):
def func(p):
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
end[0] = start[0] + pt[0] - s[0]
return end[0]
func_udf=udf(func,IntegerType())
df=df.withColumn(l[p-1],func_udf(lit(p)))
return df
df=get_temp(df)
df.show()
Вышеизложенное дает результат:
+---+---+---+-----+-----+-----+
| a| b| c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
| 2| 12| 5| 0| 2| 2|
| 8| 5| 7| 0| 4| 4|
| 9| 4| 3| 0| 2| 2|
| 3| 8| 2| 0| 4| 4|
+---+---+---+-----+-----+-----+
Ожидаемый результат:
+---+---+---+-----+-----+-----+
| a| b| c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
| 2| 12| 5| 0| 2| 4|
| 8| 5| 7| 0| 2| 4|
| 9| 4| 3| 0| 2| 4|
| 3| 8| 2| 0| 2| 4|
+---+---+---+-----+-----+-----+
Если я посмотрю вывод только внутренней функции, результат будет таким, как и ожидалось, т.е.:
s=[0]
pt = [0]
start = [0]
end = [0]
cummulative_stat = [0]
for p in xrange(1,4):
def func():
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
end[0] = start[0] + pt[0] - s[0]
return end[0]
e=func()
print e
output:
0
2
4
Не уверен, как правильно записать эти результаты из UDF в df. Опубликованный кадр данных - это всего лишь образец кадра данных, мне нужно будет использовать цикл for, потому что в моем исходном коде я вызываю другие функции (чей вывод зависит от значения итератора) в цикле for. Например, см. Ниже:
def get_temp(df):
l=['temp1','temp2','temp3']
s=[0]
pt = [0]
start = [0]
end = [0]
q=[]
cummulative_stat = [0]
for p in xrange(1,4):
def func(p):
if p < a:
cummulative_stat[0]=cummulative_stat[0]+52
pass
elif p >=a:
if p==1:
pass
elif p >1:
start[0] = end[0]
s[0]=2
pt[0] =4
if cummulative_stat and p >1:
var1=func2(p,3000)
var2=func3(var1)
cummulative_stat=np.nan
else:
var1=func2(p,3000)
var2=func3(var1)
end[0] = start[0] + pt[0] - s[0]
q.append(end[0],var1,var2)
return q
func_udf=udf(func,ArrayType(ArrayType(IntegerType())))
df=df.withColumn(l[p-1],func_udf(lit(p)))
return df
df=get_temp(df)
df.show()
Я использую pyspark 2.2. Любая помощь горячо приветствуется. Чтобы создать этот фрейм данных:
rdd = sc.parallelize([(2,12,5),(8,5,7),
(9,4,3),
(3,8,2)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c'))
df.show()