Home > Blockchain >  Reading Multiline JSON File in spark comes in one row
Reading Multiline JSON File in spark comes in one row

Time:01-25

I have a json file like below -

{
  "249": "\"Other\"",
  "63": "\"Billing\"",
  "67": "\"Handset\"",
  "72": "\"Your plan\"",
  "71": "\"Customer services\"",
  "69": "\"Network coverage\"",
  "68": "\"International roaming\"",
  "770": "\"Purchases\"",
  "70": "\"Expectations not being met\"",
  "65": "\"Fraud\""
}

I am reading this file using multiline spark.read method -

val df = sqlContext.read.option("multiline","true").json("file:///category_names.json")

Dataframe read is :

 --------------------------- --------- ------- --------- ----------------------- ------------------ ---------------------------- ------------------- ----------- ----------- 
|249                        |63       |65     |67       |68                     |69                |70                          |71                 |72         |770        |
 --------------------------- --------- ------- --------- ----------------------- ------------------ ---------------------------- ------------------- ----------- ----------- 
|"Other (none of the above)"|"Billing"|"Fraud"|"Handset"|"International roaming"|"Network coverage"|"Expectations not being met"|"Customer services"|"Your plan"|"Purchases"|
 --------------------------- --------- ------- --------- ----------------------- ------------------ ---------------------------- ------------------- ----------- ----------- 

I want to join this dataframe with another dataframe where column name here is a primary key there. I want the output in the below format

CategroryID CategoryName
249           "Other"
63            "Billing"

Is there a spark way of doing this? I can pivot the dataframe but i am looking for a better way to do this.

CodePudding user response:

Use stack function to unpivot the dataframe. You can dynamically generate the stack expression from column names list:

val stackExpr = s"stack(${df.columns.size},"   df.columns
  .flatMap(c => Seq(c, s"`$c`"))
  .mkString(",")   ") as (CategroryID, CategoryName)"

//stackExpr: String = stack(10, 249,`249`,63,`63`,65,`65`,67,`67`,68,`68`,69,`69`,70,`70`,71,`71`,72,`72`,770,`770`) as (CategroryID, CategoryName)

val df1 = df.selectExpr(stackExpr)

df1.show()

// ----------- -------------------- 
//|CategroryID|        CategoryName|
// ----------- -------------------- 
//|        249|             "Other"|
//|         63|           "Billing"|
//|         65|             "Fraud"|
//|         67|           "Handset"|
//|         68|"International ro...|
//|         69|  "Network coverage"|
//|         70|"Expectations not...|
//|         71| "Customer services"|
//|         72|         "Your plan"|
//|        770|         "Purchases"|
// ----------- -------------------- 

Another way by creating map column from each row then explode it:

import org.apache.spark.sql.functions.map

val mapExpr = map(df.columns.flatMap(c => Seq(lit(c), col(c))):_*)
val df1 = df.select(explode(mapExpr).as(Seq("CategroryID", "CategoryName")))

CodePudding user response:

For academic purposes and me refreshing my skills so now and again, an alternative - but you will need to rename cols etc. as I just used my own and assumed the DF is there and not addressing JSON:

import org.apache.spark.sql.functions._

val df   = sqlContext.createDataFrame(Seq(("xxx", "yyy", "zzz"))).toDF("v1", "v2", "v3")
val cols = df.columns
val df2  = df.withColumn("arrayColNames", array(cols.map(lit):_*))
             .withColumn("arrayColVals",  array(cols.map(df(_)):_*))

val df3 = df2.withColumn("arrayNamesVals", arrays_zip(col("arrayColNames"), col("arrayColVals")));
val df4 = df3.withColumn("aNV", explode($"arrayNamesVals"))
val df5 = df4.select($"aNV.*")    
df5.show(false)

returns:

 ------------- ------------ 
|arrayColNames|arrayColVals|
 ------------- ------------ 
|v1           |xxx         |
|v2           |yyy         |
|v3           |zzz         |
 ------------- ------------ 
  •  Tags:  
  • Related