Pyspark: итеративная запись результатов UDF обратно в фрейм данных не дает ожидаемых результатов

Я все еще новичок в pyspark, и я пытаюсь оценить функцию и итеративно создавать столбцы с помощью UDF. Ниже приведена функция:

def get_temp(df):
    l=['temp1','temp2','temp3']
    s=[0]
    pt = [0]
    start = [0]
    end = [0]
    cummulative_stat = [0]
    for p in xrange(1,4):
        def func(p):
            if p==1:
                pass
            elif p >1:
                start[0] = end[0]
                s[0]=2
                pt[0] =4
            end[0] = start[0] + pt[0] - s[0]
            return end[0]
        func_udf=udf(func,IntegerType())
        df=df.withColumn(l[p-1],func_udf(lit(p)))
    return df
df=get_temp(df)
df.show()

Вышеизложенное дает результат:

+---+---+---+-----+-----+-----+
|  a|  b|  c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
|  2| 12|  5|    0|    2|    2|
|  8|  5|  7|    0|    4|    4|
|  9|  4|  3|    0|    2|    2|
|  3|  8|  2|    0|    4|    4|
+---+---+---+-----+-----+-----+

Ожидаемый результат:

+---+---+---+-----+-----+-----+
|  a|  b|  c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
|  2| 12|  5|    0|    2|    4|
|  8|  5|  7|    0|    2|    4|
|  9|  4|  3|    0|    2|    4|
|  3|  8|  2|    0|    2|    4|
+---+---+---+-----+-----+-----+

Если я посмотрю вывод только внутренней функции, результат будет таким, как и ожидалось, т.е.:

s=[0]
pt = [0]
start = [0]
end = [0]
cummulative_stat = [0]
for p in xrange(1,4):
    def func():
        if p==1:
            pass
        elif p >1:
            start[0] = end[0]
            s[0]=2
            pt[0] =4
        end[0] = start[0] + pt[0] - s[0]
        return end[0]
    e=func()
    print e

output:
0
2
4

Не уверен, как правильно записать эти результаты из UDF в df. Опубликованный кадр данных - это всего лишь образец кадра данных, мне нужно будет использовать цикл for, потому что в моем исходном коде я вызываю другие функции (чей вывод зависит от значения итератора) в цикле for. Например, см. Ниже:

def get_temp(df):
    l=['temp1','temp2','temp3']
    s=[0]
    pt = [0]
    start = [0]
    end = [0]
    q=[]
    cummulative_stat = [0]
    for p in xrange(1,4):
        def func(p):
            if p < a:
                cummulative_stat[0]=cummulative_stat[0]+52
                pass
            elif p >=a:

                if p==1:
                    pass
                elif p >1:
                    start[0] = end[0]
                    s[0]=2
                    pt[0] =4
                if cummulative_stat and p >1:
                    var1=func2(p,3000)
                    var2=func3(var1)
                    cummulative_stat=np.nan
                else:
                    var1=func2(p,3000)
                    var2=func3(var1)         
                end[0] = start[0] + pt[0] - s[0]
            q.append(end[0],var1,var2)
            return q
        func_udf=udf(func,ArrayType(ArrayType(IntegerType())))
        df=df.withColumn(l[p-1],func_udf(lit(p)))
    return df
df=get_temp(df)
df.show()

Я использую pyspark 2.2. Любая помощь горячо приветствуется. Чтобы создать этот фрейм данных:

rdd =  sc.parallelize([(2,12,5),(8,5,7),
                 (9,4,3),
                  (3,8,2)])
df = sqlContext.createDataFrame(rdd, ('a', 'b','c'))
df.show()

person Mia21    schedule 19.03.2018    source источник
comment
почему вы хотите использовать udf для?   -  person Ramesh Maharjan    schedule 19.03.2018
comment
Может ли это быть проблемой XY? Что ты пытаешься сделать? Вероятно, есть более простой подход.   -  person pault    schedule 19.03.2018
comment
@pault я обновил вопрос с образцом кода. Я хочу использовать udf, потому что я делаю внутри него вызовы других функций и, в конечном итоге, выполняю некоторые математические операции с вызываемыми функциями и возвращаю результат. Показанная выше функция: func вычисляет вызовы других функций.   -  person Mia21    schedule 19.03.2018
comment
поскольку ваши функции различны для разных столбцов, я бы предложил вам написать разные функции для каждого столбца и вызывать их отдельно.   -  person Ramesh Maharjan    schedule 19.03.2018
comment
Не могли бы вы пролить больше света на это? Не знаю, как разделить на отдельные функции, поскольку cummulative_stat оценивается на основе его значения, полученного в предыдущей итерации. Спасибо!   -  person Mia21    schedule 19.03.2018


Ответы (1)


Из того, что я понял, глядя на ваши коды, так это то, что значение вашего следующего столбца зависит от предыдущего. Если я правильно понимаю, то могу сказать, что ваше определение функции udf размещено не в том месте. И нужны небольшие изменения в ваших кодах, чтобы все заработало.

Пойдем шаг за шагом

У тебя уже есть

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  2| 12|  5|
|  8|  5|  7|
|  9|  4|  3|
|  3|  8|  2|
+---+---+---+

Нам нужен столбец инициализатора, и я вижу, что он равен 0

from pyspark.sql import functions as F
from pyspark.sql import types as T

df=df.withColumn('temp0', F.lit(0))

что должно быть

+---+---+---+-----+
|  a|  b|  c|temp0|
+---+---+---+-----+
|  2| 12|  5|    0|
|  8|  5|  7|    0|
|  9|  4|  3|    0|
|  3|  8|  2|    0|
+---+---+---+-----+

Мы должны переместить функцию udf за пределы цикла как

def func(p, end):
    start = 0
    s = 0
    pt = 0
    if p==1:
        pass
    elif p >1:
        start = end
        s=2
        pt =4
    end = start + pt - s
    return end

func_udf=F.udf(func, T.IntegerType())

и вызвать функцию udf внутри цикла как

def get_temp(df):
    l=['temp1','temp2','temp3']
    for p in xrange(1,4):
        df=df.withColumn(l[p-1],func_udf(F.lit(p), F.col('temp'+str(p-1))))
    return df

df=get_temp(df)

и, наконец, удалить столбец инициализатора

df=df.drop('temp0')

который должен дать вам желаемый результат

+---+---+---+-----+-----+-----+
|  a|  b|  c|temp1|temp2|temp3|
+---+---+---+-----+-----+-----+
|  2| 12|  5|    0|    2|    4|
|  8|  5|  7|    0|    2|    4|
|  9|  4|  3|    0|    2|    4|
|  3|  8|  2|    0|    2|    4|
+---+---+---+-----+-----+-----+

надеюсь ответ будет полезен

person Ramesh Maharjan    schedule 24.03.2018