Home > Mobile >  Update some rows of a dataframe or create new dataframe in PySpark
Update some rows of a dataframe or create new dataframe in PySpark

Time:01-28

I am new to PySpark and my objective is to use PySpark script in AWS Glue for:

  1. reading a dataframe from input file in Glue => done
  2. changing columns of some rows which satisfy a condition => facing issue
  3. write the updated dataframe on the same schema into S3 => done

The task seems very simple, but I could not find a way to complete it and still facing different different issues with my changing code.

Till now, my code looks like this:

Transform2.printSchema() # input schema after reading 
Transform2 = Transform2.toDF()
def updateRow(row):
    # my logic to update row based on a global condition 
    #if row["primaryKey"]=="knownKey": row["otherAttribute"]= None
    return row

LocalTransform3 = [] # creating new dataframe from Transform2 
for row in Transform2.rdd.collect():
    row = row.asDict()
    row = updateRow(row)
    LocalTransform3.append(row)
print(len(LocalTransform3))

columns = Transform2.columns
Transform3 = spark.createDataFrame(LocalTransform3).toDF(*columns)
print('Transform3 count', Transform3.count())
Transform3.printSchema()
Transform3.show(1,truncate=False)

Transform4 = DynamicFrame.fromDF(Transform3, glueContext, "Transform3")
print('Transform4 count', Transform4.count()) 

I tried using multiple things like:

  • using map to update existing rows in a lambda
  • using collect()
  • using createDataFrame() to create new dataframe

But faced errors in below steps:

  • not able to create new updated rdd
  • not able to create new dataframe from rdd using existing columns

Some errors in Glue I got, at different stages:

  • ValueError: Some of types cannot be determined after inferring
  • ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
  • An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. Traceback (most recent call last):

Any working code snippet or help is appreciated.

CodePudding user response:

from pyspark.sql.functions import col, lit, when

Transform2 = Transform2.toDF()
withKeyMapping = Transform2.withColumn('otherAttribute', when(col("primaryKey") == "knownKey", lit(None)).otherwise(col('otherAttribute')))

This should work for your use-case.

  •  Tags:  
  • Related