I have a .csv file with several columns.
One row as an example:
aaa,bbb,{'foo': 'xxx', 'bar': 'zzz'}
I want to read it and transform to the following typed schema:
field1: String,
field2: String,
field3: Map[String, String]
I can do this with primitive types like this:
private val someSchema =
StructType(
StructField("field1", StringType, true) ::
StructField("field2", StringType, true) ::
StructField("field3", StringType, true) :: Nil)
spark.read
.format("csv")
.option("header", true)
.schema(someSchema)
.load("path.csv")
But when it comes to Map[String, String] it does not work due to
Exception in thread "main" org.apache.spark.sql.AnalysisException: CSV data source does not support map<string,string> data type.
How can I do this another way?
CodePudding user response:
You need to read it as string then convert it to MapType. You can replace single quotes by double quotes in the field3 to get valid json strings, before applying from_json function to parse it as map<string,string>:
val result = df.withColumn(
"field3",
from_json(
regexp_replace(col("field3"), "'", "\""),
lit("map<string,string>")
)
)
However, from your example it seems the values are not escaped in the file, spark will fail parsing it because you have commas (delimiter) inside the values of column field3.
In this case you can read the file as text, then replace the commas inside the {} by another delimiter say ;, split by , to get the 3 columns and convert the column field3 to map using str_to_map function:
val df = spark.text("/path/file.csv")
val result = df.withColumn(
"value",
split(regexp_replace(col("value"), ",(?=[^{}]*\\})", ";"), ",")
).select(
col("value")(0).as("field1"),
col("value")(1).as("field2"),
regexp_replace(col("value")(2), "[{}' ]", "").as("field3")
).withColumn(
"field3",
expr("str_to_map(field3, ';', ':')")
)
result.show
// ------ ------ ------------------------
//|field1|field2|field3 |
// ------ ------ ------------------------
//|aaa |bbb |[foo -> xxx, bar -> zzz]|
// ------ ------ ------------------------
