PySpark Python Сортировка кадра данных с использованием столбца

Итак, у меня есть 2 вопроса, которые, я думаю, должны быть основными для людей, имеющих опыт работы с PySpark, но я не могу их решить.

Примеры записей в моем файле csv:

"dfg.AAIXpWU4Q","1"
"cvbc.AAU3aXfQ","1"
"T-L5aw0L1uT_OfFyzbk","1"
"D9TOXY7rA_LsnvwQa-awVk","2"
"JWg8_0lGDA7OCwWcH_9aDc","2"
"ewrq.AAbRaACr2tVh5wA","1"
"ewrq.AALJWAAC-Qku3heg","1"
"ewrq.AADStQqmhJ7A","2"
"ewrq.AAEAABh36oHUNA","1"
"ewrq.AALJABfV5u-7Yg","1"

Я создаю следующий фреймворк данных:

>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows

Во-первых, это правильный способ преобразовать столбец hits в IntegerType()? Почему все значения становятся null?

>>> df2 = df2.withColumn("hits", df2["hits"].cast(IntegerType()))
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...|null|
|"yDQ...|null|
|"qUU...|null|
+-------+----+
only showing top 3 rows

Во-вторых, мне нужно отсортировать этот список в порядке убывания по столбцу hits. Итак, я попробовал это-

>>> df1 = df2.sort(col('hits').desc())
>>> df1.show(20)

Но я получаю следующую ошибку-

java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided.

Я предполагаю, что это связано с тем, что я создаю свой фрейм данных, используя:

>>> rdd = sc.textFile("/path/to/file/*")
>>> rdd.take(2)
['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
​
>>> my_df = rdd.map(lambda x: (x.split(","))).toDF()

>>> df2 = my_df.selectExpr("_1 as user_id", "_2 as hits")
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows

И я предполагаю, что в некоторых строках есть лишние запятые. Как этого избежать или как лучше всего прочитать этот файл?


person kev    schedule 09.09.2019    source источник


Ответы (2)


ОБНОВЛЕНИЕ

-- Добавление файла для чтения и разделения

глядя на пример выше, создал такой файл

'"7wAfdgdfgd","7"'
'"1x3Qdfgdf","1"'
'"13xxyyzzsdff","13"'

--Обратите внимание на ', чтобы сделать все строки одной строкой. Теперь код для ее чтения:

scala> val myRdd = sc.textFile("test_file.dat")
myRdd: org.apache.spark.rdd.RDD[String] = test_file.dat MapPartitionsRDD[1] at textFile at <console>:24
// please check the type of RDD , here it is string
// We need to have Iterable[tuple(String,String)] to convert it into Dataframe

scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1)))
res0: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at map at <console>:26

// Finally
    scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1))).toDF("user_id","hits").show(false)
+--------------+----+
|user_id       |hits|
+--------------+----+
|"7wAfdgdfgd"  |"7" |
|"1x3Qdfgdf"   |"1" |
|"13xxyyzzsdff"|"13"|
+--------------+----+

КОНЕЦ ОБНОВЛЕНИЯ

так как вы новичок (или нет), я рекомендую/практикую использование фактического ANSI sql вместо pyspark.sql.functions. Это упрощает обслуживание + нет никаких преимуществ использования sql.functions по сравнению с ansi sql. Очевидно, вам нужно знать функции sql/columns, предоставляемые spark, поскольку я использовал split, orderBy и cast в ответе. Поскольку вы не предоставили содержание text file, вот мое мнение и все 3 ответа в одном SQL

    myDf = spark.createDataFrame([("abc","7"),("xyz","18"),("lmn","4,xyz")],schema=["user_id","hits"])
myDf.show(20,False)
+-------+-----+
|user_id|hits |
+-------+-----+
|abc    |7    |
|xyz    |18   |
|lmn    |4,xyz|
+-------+-----+

myDf.createOrReplaceTempView("hits_table")

SQL + результат

    spark.sql("select user_id, cast(split(hits,',')[0] as integer) as hits from hits_table order by hits desc ").show(20,False)
    +-------+----+
    |user_id|hits|
    +-------+----+
    |xyz    |18  |
    |abc    |7   |
    |lmn    |4   |
    +-------+----+
person SanBan    schedule 09.09.2019
comment
Вы можете создать фрейм данных из RDD, как я? Я думаю, что строка my_df = rdd.map(lambda x: (x.split(","))).toDF() является основной причиной моей проблемы. Я получаю сообщение об ошибке Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided. Вы создаете фрейм данных из простых данных. - person kev; 10.09.2019
comment
@kev, не могли бы вы вставить образец необработанных данных, пожалуйста? - person SanBan; 10.09.2019
comment
@kev, пожалуйста, проверьте мой обновленный ответ о чтении как RDD, его разделении и преобразовании в фрейм данных. Также показано, как тип изменяется при каждой операции. - person SanBan; 10.09.2019
comment
не могли бы вы дать мне ответы для Python? - person kev; 10.09.2019
comment
@kev Разве Python не использует «[]» для получения индекса списка? Заработает ? - person SanBan; 12.09.2019
comment
Также для использования toDF создайте объект structType с двумя structFields, каждый из которых имеет строку типа - person SanBan; 12.09.2019
comment
можете ли вы проверить новый ответ, я опубликовал результаты - person kev; 12.09.2019

Итак, по ответу @SanBan я получил следующие результаты:

>>> rdd = sc.textFile("/home/jsanghvi/work/buffer/*")

>>> schema =  StructType([StructField ("user_id", StringType(), True), StructField ("hits", StringType(), True)])

>>> my_rdd = rdd.map(lambda x: x.replace("'","")).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1]))

>>> my_rdd2 = my_rdd.map(lambda x: str(x).replace("'","").replace("(", "").replace(")", "")).map(lambda x: x.split(",")).map(lambda x: (x[0],x[1]))

>>> df1 = spark.createDataFrame(my_rdd2, schema)

>>> dfx = df1.sort(col('hits').desc())

>>> dfx.show(5)
+----------------+--------------------+                                     
|         user_id|                hits|
+----------------+--------------------+
|"AUDIO_AUTO_PLAY| EXPANDABLE_AUTOM...|
|       "user_id"|             "_col1"|
| "AAESjk66lDk...|              "9999"|
| "ABexsk6sLlc...|              "9999"|
| "AAgb1k65pHI...|              "9999"|
+----------------+--------------------+

# removing garbage rows
>>> dfx = df2.filter(~col("hits").isin(["_col1", "EXPANDABLE_AUTOM..."]))
person kev    schedule 12.09.2019