sqlContext.createDataframe из строки со схемой. pyspark: TypeError: IntegerType не может принимать объект типа ‹типа «юникод»›

Потратив много времени на выяснение, почему я получаю следующую ошибку

pyspark: TypeError: IntegerType can not accept object in type <type 'unicode'>

при попытке создать фрейм данных на основе строк и схемы я заметил следующее:

Строка внутри моего rdd с именем rrdRows выглядит следующим образом:

Row(a="1", b="2", c=3)

и моя dfSchema определена как:

dfSchema = StructType([
        StructField("c", IntegerType(), True),
        StructField("a", StringType(), True),
        StructField("b", StringType(), True)
        ])

создание кадра данных следующим образом:

df = sqlContext.createDataFrame(rddRows, dfSchema)

приводит к вышеупомянутой ошибке, поскольку Spark учитывает только порядок StructFields в схеме и не сопоставляет имя StructFields с именем полей Row.

Другими словами, в приведенном выше примере я заметил, что spark пытается создать фрейм данных, который будет выглядеть следующим образом (если бы не было typeError. e.x, если бы все было типа String)

+---+---+---+
| c | b | a |
+---+---+---+
| 1 | 2 | 3 |
+---+---+---+

это действительно ожидается, или какая-то ошибка?

РЕДАКТИРОВАТЬ: rddRows создаются в соответствии с этими строками:

def createRows(dic):
    res = Row(a=dic["a"],b=dic["b"],c=int(dic["c"])
    return res

rddRows = rddDict.map(createRows)

где rddDict — проанализированный файл JSON.


person Kito    schedule 03.11.2015    source источник
comment
Как вы создаете свой rddRows?   -  person eliasah    schedule 03.11.2015
comment
Код немного великоват для комментария, но я делаю так: def createRows(dic): res = Row(a=dic[a],b=dic[b],c=int(dic[c] ) return res rddRows = rddDict.map(createRows) Где rddDict — это проанализированный файл JSON.Тем не менее, я также попробовал это с другим примером, но получил те же результаты.   -  person Kito    schedule 03.11.2015
comment
тип: ‹класс 'pyspark.rdd.PipelinedRDD'›. Я использую его внутри потоковой передачи искры, но я также наблюдал ту же проблему в очень простом пакетном задании.   -  person Kito    schedule 03.11.2015
comment
Ну, это похоже на ожидаемое поведение. PySpark Row, как и его аналог в Scala, представляет собой просто кортеж. Это означает, что он имеет фиксированный порядок значений и размер. Все остальное, например, имена или схема (в случае версии Scala) — это просто метаданные. Поскольку строка может вообще не иметь имен или имена в схеме могут отличаться от имен в строках, единственным разумным соответствием является порядок. Это отличается, например, от источника JSON, где порядок не имеет значения, а имена — единственный хороший способ сопоставить записи.   -  person zero323    schedule 03.11.2015
comment
Хммм ладно. Благодарю за разъяснение. Возможно, в качестве краткого продолжения: скажем, у меня уже есть другой фрейм данных со столбцами c, b, a, который я хочу добавить к созданному выше фрейму данных. Как лучше всего это реализовать? Я подумал о функции .unionAll. Однако, чтобы использовать его, мне нужен одинаковый порядок столбцов для обоих фреймов данных, верно?   -  person Kito    schedule 03.11.2015
comment
Верно, именно так работает объединение SQL. Вы можете настроить код Scala, который я предоставил здесь, но я думаю, что явное упорядочение намного чище, если у вас нет очень большого числа столбцов. Кстати, если вы принимаете JSON в качестве входных данных, почему бы не использовать SqlContext.read.json?   -  person zero323    schedule 03.11.2015
comment
Да, в итоге я использовал простой список вроде [3, 1, 2] вместо строки. Таким образом, я могу влиять на порядок столбцов фрейма данных. Спасибо за SqlContext.read.json, но я получаю JSON через сокет внутри Spark Streaming. Насколько я знаю, SqlContext.read.json можно использовать только для чтения из файла, верно?   -  person Kito    schedule 03.11.2015
comment
Насколько я знаю в PySpark да. В Scala это тоже может быть RDD[String].   -  person zero323    schedule 04.11.2015


Ответы (1)


Конструктор Row сортирует ключи, если вы предоставляете аргументы ключевого слова. Взгляните на исходный код здесь. Когда я узнал об этом, я отсортировал свой schema соответствующим образом, прежде чем применять его к фрейму данных:

   sorted_fields = sorted(dfSchema.fields, key=lambda x: x.name)
   sorted_schema = StructType(fields=sorted_fields)
   df = sqlContext.createDataFrame(rddRows, sorted_schema)
person architectonic    schedule 10.11.2015