Given superset and superset2 input text files. Superset is having all the required headers (keys).
Rows from superset2 file may be missing value for some column in header, need to fill missing value with $ character.
A. superset:
a|b|c|d|e|f|g|h|i|j
B. superset2:
a:1,b:1,d:1,e:1,h:1
a:2,e:2,d:2,h:2,f:2
c:3,e:3,d:3,h:3,f:3
a:4,b:4,c:4,e:4,h:4,f:4,i:4,j:4
Expected output:
a|b|c|d|e|f|g|h|i|j
1|1|$|1|1|$|$|1|$|$
2|$|$|2|2|2|$|2|$|$
$|$ |3|3|3|3|$|3|$|$
4|4|4|$|4|4|$|4|4|4
CodePudding user response:
Read the 2 files into Dataframes and:
- get the list of keys (columns) of the the first dataframe
- do some transformations on the second dataframe which contains the data, by splitting the values first by
,then second by:using combination oftransformandmap_from_entriesfunctions to convert each row into a map column - finally using list comprehension on the list of keys select the columns and
fillnato replace nulls by$:
from pyspark.sql import functions as F
keys = spark.read.csv(keys_file_path, sep="|", header=True).columns
data = spark.read.text(data_file_path)
df = data.withColumn(
"value",
F.map_from_entries(
F.expr("""transform(
split(value , ','),
x -> struct(split(x, ':')[0] as col, split(x, ':')[1] as val)
)""")
)
).select(*[
F.col("value").getItem(k).alias(k) for k in keys
]).fillna("$")
df.show(truncate=False)
# --- --- --- --- --- --- --- --- --- ---
#|a |b |c |d |e |f |g |h |i |j |
# --- --- --- --- --- --- --- --- --- ---
#|1 |1 |$ |1 |1 |$ |$ |1 |$ |$ |
#|2 |$ |$ |2 |2 |2 |$ |2 |$ |$ |
#|$ |$ |3 |3 |3 |3 |$ |3 |$ |$ |
#|4 |4 |4 |$ |4 |4 |$ |4 |4 |4 |
# --- --- --- --- --- --- --- --- --- ---
