My data will look like below
Journey Table
| SERNR | TYPE |
|---|---|
| 123 | null |
| 456 | null |
| 789 | null |
Segment Table
| SERNR | Sgmnt | FROM-Station | TO-Station |
|---|---|---|---|
| 123 | 01 | A | B |
| 123 | 02 | B | C |
| 123 | 03 | C | B |
| 123 | 04 | B | A |
| 456 | 01 | A | B |
| 456 | 02 | B | C |
| 456 | 01 | C | D |
| 456 | 01 | D | A |
| 789 | 04 | A | B |
I want to join these two data frames/tables and have check on the journey station FROM and TO to decide a journey type, i.e if its return journey some type A if its mirror return some type B, if its a one-way journey some type C
type calculation will be as follows
lets say for journey SERNR 123, the journey details are A->B , B->C, C->B,B->A, this is a mirror journey, because its A-B- C then C-B- A.
for 789 its A->B so its a normal journey .
for 456 its A-> B, B->C , C->D , D-A, in short A-B-C then C-D-A , this is a return but not a mirror
I really don't know how to do a comparison of rows in Dataframe based on SERNR to decide the type by checking FROM and To station of the same SERNR
Really appreciate if I can get a pointer to go ahead and implement the same.
CodePudding user response:
Use cllect_list of from_ station or to_station by grouping it with SERNR and order with segment
CodePudding user response:
You can collect the list of FROM TO journeys into an array column for each SERNR, then join the array elements to get a journey_path (A-B-C...).
When you get the journey path for each journey, you can use when expression to determine the TYPE:
- If first FROM != last TO then it's
normal - else : if the reverse of the journey_path == the journey_path the
mirrorotherwise it's areturn
Note that you need to use a Window to keep the order of the segment when grouping and collecting the list of FROM - TOs.
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("SERNR").orderBy("Sgmnt").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val result = segment_df.select(
col("SERNR"),
array_join(
collect_list(concat_ws("-", col("FROM"), col("TO"))).over(w),
"-"
).alias("journey_path")
).dropDuplicates(Seq("SERNR")).withColumn(
"TYPE",
when(
substring(col("journey_path"), 0, 1) =!= substring(col("journey_path"), -1, 1),
"normal"
).otherwise(
when(
reverse(col("journey_path")) === col("journey_path"),
"mirror"
).otherwise("return")
)
)
.drop("journey_path")
result.show
// ----- ------
//|SERNR| TYPE|
// ----- ------
//| 789|normal|
//| 456|return|
//| 123|mirror|
// ----- ------
