Home > OS >  How to join two dataframes where column matches with two columns in the second dataframe?
How to join two dataframes where column matches with two columns in the second dataframe?

Time:01-12

There are 2 DataFrames. I want to only select those devices from first DF which fulfills these conditions:

  1. If PatternDS has patterns on both Pat1 and Pat2 and any devices match those
  2. If on PatternDS either Pat1 or Pat2 is NA, then any device match other side's pattern

I can do this with some UDF and some loops but I want to do this with some joins. Any hints appreciated.

DevicesDS:

| DeviceId | Pattern    |
| -------- | ---------- |
| D1       | Dr_123_5.0 |
| D2       | Dr_456_6.0 |
| D3       | Ap_111_3.5 |
| D1       | Ap_333_4.5 |
| D2       | OE_222_7.7 |
| D4       | Dr_123_5.0 |

PatternDS:

|     Pat1      |     Pat2       |
| --------------| -------------- |
|Dr_123_5.0     | Ap_333_4.5     |
|NA             | OE_222_7.7     |
|Ap_111_3.5     | NA             |
val result = DevicesDS.groupBy("deviceId","Pattern").count().groupBy("deviceId").agg(collect_set(struct("Pattern")).as("Pat"))

I get two columns from the DeviceDS where first column is DeviceId and second Column is collect_set of list of Patterns.

Now I need to apply a join.

Expected Output:

  1. Since D1 has both Pat1 (Dr_123_5.0) & Pat2 (Ap_333_4.5) match, this should be included
  2. D2 has Pat2 (OE_222_7.7) and Pat1 for that row is NA, this should be included
  3. D3 has Pat1 (Ap_111_3.5) and Pat2 for that row is NA, this should be included
  4. D4 has Pat1 (Dr_123_5.0) from row#1 but does not have the Pat2 in it, so this is not eligible.

So final Result is:

| DeviceId | Patterns          |
| -------- | ----------        |
| D1       | array of Patterns |
| D2       | array of Patterns |
| D3       | array of Patterns |

Note D4 is not in this list because that did not meet the criteria. Patterns includes array of matching patterns.

CodePudding user response:

Assuming these are your input dataframes:

val DevicesDS = Seq(
  ("D1", "Dr_123_5.0"), ("D2", "Dr_456_6.0"), ("D2", "OE_222_7.7"),
  ("D3", "Ap_111_3.5"), ("D1", "Ap_333_4.5"), ("D4", "Dr_123_5.0")
).toDF("DeviceId", "Pattern")

val PatternDS = Seq(
  ("Dr_123_5.0", "Ap_333_4.5"), ("NA", "OE_222_7.7"),("Ap_111_3.5", "NA")
).toDF("Pat1", "Pat2")

First, group by the dataframe DeviceDS to get list of patterns associated with each DeviceId:

val DevicesDSGrouped = DevicesDS.groupBy("deviceId").agg(collect_set($"Pattern").as("Patterns"))

DevicesDSGrouped.show(false)
// -------- ------------------------ 
//|deviceId|Patterns                |
// -------- ------------------------ 
//|D1      |[Dr_123_5.0, Ap_333_4.5]|
//|D3      |[Ap_111_3.5]            |
//|D2      |[OE_222_7.7, Dr_456_6.0]|
//|D4      |[Dr_123_5.0]            |
// -------- ------------------------ 

Then, join with PatternDS dataframe using array_except function in condition to check if both patterns match or one match and another is NA. And finally, group by DeviceId and collect columns Pat1 and Pat2 like this:

val joinCondition = size(array_except(array_remove(array($"Pat1", $"Pat2"), "NA"), $"Patterns")) === 0

val result = DevicesDSGrouped.join(PatternDS, joinCondition)
  .groupBy("deviceId")
  .agg(
    array_remove(flatten(collect_list(array($"Pat1", $"Pat2"))), "NA").as("Patterns")
  )

result.show(false)
// -------- ------------------------ 
//|deviceId|Patterns                |
// -------- ------------------------ 
//|D1      |[Dr_123_5.0, Ap_333_4.5]|
//|D3      |[Ap_111_3.5]            |
//|D2      |[OE_222_7.7]            |
// -------- ------------------------ 
  •  Tags:  
  • Related