There are 2 DataFrames. I want to only select those devices from first DF which fulfills these conditions:
- If PatternDS has patterns on both Pat1 and Pat2 and any devices match those
- If on PatternDS either Pat1 or Pat2 is NA, then any device match other side's pattern
I can do this with some UDF and some loops but I want to do this with some joins. Any hints appreciated.
DevicesDS:
| DeviceId | Pattern |
| -------- | ---------- |
| D1 | Dr_123_5.0 |
| D2 | Dr_456_6.0 |
| D3 | Ap_111_3.5 |
| D1 | Ap_333_4.5 |
| D2 | OE_222_7.7 |
| D4 | Dr_123_5.0 |
PatternDS:
| Pat1 | Pat2 |
| --------------| -------------- |
|Dr_123_5.0 | Ap_333_4.5 |
|NA | OE_222_7.7 |
|Ap_111_3.5 | NA |
val result = DevicesDS.groupBy("deviceId","Pattern").count().groupBy("deviceId").agg(collect_set(struct("Pattern")).as("Pat"))
I get two columns from the DeviceDS where first column is DeviceId and second Column is collect_set of list of Patterns.
Now I need to apply a join.
Expected Output:
- Since
D1has bothPat1(Dr_123_5.0) &Pat2(Ap_333_4.5) match, this should be included D2hasPat2(OE_222_7.7) andPat1for that row isNA, this should be includedD3hasPat1(Ap_111_3.5) andPat2for that row isNA, this should be includedD4hasPat1(Dr_123_5.0) from row#1 but does not have thePat2in it, so this is not eligible.
So final Result is:
| DeviceId | Patterns |
| -------- | ---------- |
| D1 | array of Patterns |
| D2 | array of Patterns |
| D3 | array of Patterns |
Note D4 is not in this list because that did not meet the criteria. Patterns includes array of matching patterns.
CodePudding user response:
Assuming these are your input dataframes:
val DevicesDS = Seq(
("D1", "Dr_123_5.0"), ("D2", "Dr_456_6.0"), ("D2", "OE_222_7.7"),
("D3", "Ap_111_3.5"), ("D1", "Ap_333_4.5"), ("D4", "Dr_123_5.0")
).toDF("DeviceId", "Pattern")
val PatternDS = Seq(
("Dr_123_5.0", "Ap_333_4.5"), ("NA", "OE_222_7.7"),("Ap_111_3.5", "NA")
).toDF("Pat1", "Pat2")
First, group by the dataframe DeviceDS to get list of patterns associated with each DeviceId:
val DevicesDSGrouped = DevicesDS.groupBy("deviceId").agg(collect_set($"Pattern").as("Patterns"))
DevicesDSGrouped.show(false)
// -------- ------------------------
//|deviceId|Patterns |
// -------- ------------------------
//|D1 |[Dr_123_5.0, Ap_333_4.5]|
//|D3 |[Ap_111_3.5] |
//|D2 |[OE_222_7.7, Dr_456_6.0]|
//|D4 |[Dr_123_5.0] |
// -------- ------------------------
Then, join with PatternDS dataframe using array_except function in condition to check if both patterns match or one match and another is NA. And finally, group by DeviceId and collect columns Pat1 and Pat2 like this:
val joinCondition = size(array_except(array_remove(array($"Pat1", $"Pat2"), "NA"), $"Patterns")) === 0
val result = DevicesDSGrouped.join(PatternDS, joinCondition)
.groupBy("deviceId")
.agg(
array_remove(flatten(collect_list(array($"Pat1", $"Pat2"))), "NA").as("Patterns")
)
result.show(false)
// -------- ------------------------
//|deviceId|Patterns |
// -------- ------------------------
//|D1 |[Dr_123_5.0, Ap_333_4.5]|
//|D3 |[Ap_111_3.5] |
//|D2 |[OE_222_7.7] |
// -------- ------------------------
