Home > Net >  Pyspark remove field in struct column
Pyspark remove field in struct column

Time:01-27

I want to remove a part of a value in a struct and save that version of the value as a new column in my dataframe, which looks something like this:

column
{"A": "2022-01-26T14:21:32.214 0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}

I want to remove the field C and it's values and save the rest as one new column without dividing A, B, D fields into different columns. What I want should look like this:

column newColumn
{"A": "2022-01-26T14:21:32.214 0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"} {"A": "2022-01-26T14:21:32.214 0000", "B": 69, "D": "XD"}

I have successfully removed C by converting my dataframe to a dict, but now I can't manage to convert it back into ONE column. My attempt at removing C looks like this:

dfTemp = df.select('column').collect()[0][0].asDict(True)
dfDict = {}
for k in dfTemp:
    if k != 'C':
        dfDict[k] = dfTemp[k]

If you have a better way to remove a part of struct like mine and keeping the result in one column and not adding more rows or if you know how to convert a dict to a dataframe without dividing the key and value pairs into separate columns please leave a suggestion.

CodePudding user response:

Well, it's not trivial as it would seems. First, your approach is not meant for Spark, unless you're working with very little data (and so, you don't need Spark) and you're better off using pure Python like you tried. Using collect() fetch all data on the driver which would not work with large data.

The distributed approach for this is as follows:

  • infer schema on part of your JSON data (unless you want to do it manually - which is tedious)
  • transform your dataframe with this schema to have access to named attributes
  • select attributes as needed and back to JSON

I tried to decompose as much as I could here:

from pyspark.sql.types import IntegerType, StructType, StringType, StructField
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import json

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Create input data
data = [json.dumps({"A": "2022-01-26T14:21:32.214 0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"})]
df = spark.createDataFrame(data, "string").toDF("colA")
df.show()

 ----------------------------------------------------------------------------------------- 
|colA                                                                                     |
 ----------------------------------------------------------------------------------------- 
|{"A": "2022-01-26T14:21:32.214 0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}|
 ----------------------------------------------------------------------------------------- 


# Infer schema - infering on first 10 rows
s = df.select(F.col("colA").alias("s")).rdd.map(lambda x: x.s).take(10)
schema = spark.read.json(sc.parallelize(s)).schema
print(schema)

# StructType(List(StructField(A,StringType,true),StructField(B,LongType,true),StructField(C,StructType(List(StructField(CA,LongType,true),StructField(CB,StringType,true))),true),StructField(D,StringType,true)))


# read JSON string with schema
new_df = df.withColumn("colB", F.from_json("colA", schema))
new_df.show(truncate=False)

 ----------------------------------------------------------------------------------------- --------------------------------------------------- 
|colA                                                                                     |colB                                               |
 ----------------------------------------------------------------------------------------- --------------------------------------------------- 
|{"A": "2022-01-26T14:21:32.214 0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}|{2022-01-26T14:21:32.214 0000, 69, {42, Hello}, XD}|
 ----------------------------------------------------------------------------------------- --------------------------------------------------- 


# Finally ...
new_df.select(F.to_json(F.struct("colB.A", "colB.B", "colB.D")).alias("colC")).show(truncate=False)

 ---------------------------------------------------- 
|colC                                                |
 ---------------------------------------------------- 
|{"A":"2022-01-26T14:21:32.214 0000","B":69,"D":"XD"}|
 ---------------------------------------------------- 

CodePudding user response:

Assuming your column is of type string and contains json, you can first parse it into StructType using from_json like this:

df = spark.createDataFrame([
    ('{"A": "2022-01-26T14:21:32.214 0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}',)
], ["column"])

df = df.withColumn(
    "parsed_column",
    F.from_json("column", "struct<A:string,B:int,C:struct<A:int,CB:string>,D:string>")
)

Now removing the field C from the struct column:

Spark >=3.1

Use dropFields method:

result = df.withColumn("newColumn", F.to_json(F.col("parsed_column").dropFields("C"))).drop("parsed_column")

result.show(truncate=False)

# ----------------------------------------------------------------------------------------- ---------------------------------------------------- 
#|column                                                                                   |newColumn                                           |
# ----------------------------------------------------------------------------------------- ---------------------------------------------------- 
#|{"A": "2022-01-26T14:21:32.214 0000", "B": 69, "C": {"CA": 42, "CB": "Hello"}, "D": "XD"}|{"A":"2022-01-26T14:21:32.214 0000","B":69,"D":"XD"}|
# ----------------------------------------------------------------------------------------- ---------------------------------------------------- 

Spark <3.1

Recreate the struct column and filter the field C

result = df.withColumn(
    "newColumn",
    F.to_json(
        F.struct(*[
            F.col(f"parsed_column.{c}").alias(c)
            for c in df.selectExpr("parsed_column.*").columns if c != 'C'
        ])
    )
).drop("parsed_column")

Another method by parsing the json string values into MapType then applying function map_filter to remove key C:

result = df.withColumn(
    "newColumn",
    F.to_json(
        F.map_filter(
            F.from_json("column", "map<string,string>"),
            lambda k, v: k != "C"
        )
    )
  •  Tags:  
  • Related