Hi EveryOne I'm new in Spark scala. I want to find the nearest values by partition using spark scala. My input is something like this:
first row for example: value 1 is between 2 and 7 in the value2 columns
-------- ---------- ----------
|id |value1 |value2 |
-------- ---------- ----------
|1 |3 |1 |
|1 |3 |2 |
|1 |3 |7 |
|2 |4 |2 |
|2 |4 |3 |
|2 |4 |8 |
|3 |5 |3 |
|3 |5 |6 |
|3 |5 |7 |
|3 |5 |8 |
My output should like this:
-------- ---------- ----------
|id |value1 |value2 |
-------- ---------- ----------
|1 |3 |2 |
|1 |3 |7 |
|2 |4 |3 |
|2 |4 |8 |
|3 |5 |3 |
|3 |5 |6 |
Can someone guide me how to resolve this please.
CodePudding user response:
Instead of providing a code answer as you appear to want to learn I've provided you pseudo code and references to allow you to find the answers for yourself.
- Group the elements (select id, value1) (aggregate on value2
with
collect_list) so you can collect all the value2 into an array. - select (id, and (add(
concat) value1 to thecollect_listarray)) Sorting the array . - find(
array_position) value1 in the array. splicethe array. retrieving value before and value after the result of (array_position)- If the array is less than 3 elements do error handling
- now the last value in the array and the first value in the array are your 'closest numbers'.
CodePudding user response:
You will need window functions for this.
val window = Window
.partitionBy("id", "value1")
.orderBy(asc("value2"))
val result = df
.withColumn("prev", lag("value2").over(window))
.withColumn("next", lead("value2").over(window))
.withColumn("dist_prev", col("value2").minus(col("prev")))
.withColumn("dist_next", col("prev").minus(col("value2")))
.withColumn("min", min(col("dist_prev")).over(window))
.filter(col("dist_prev") === col("min") || col("dist_next") === col("min"))
.drop("prev", "next", "dist_prev", "dist_next", "min")
I haven't tested it, so think about it more as an illustration of the concept than a working ready-to-use example.
Here is what's going on here:
- First, create a
windowthat describes your grouping rule: we want the rows grouped by the first two columns, and sorted by the third one within each group. - Next, add
prevandnextcolumns to the dataframe that contain the value ofvalue2column from previous and next row within the group respectively. (prevwill be null for the first row in the group, andnextwill be null for the last row – that is ok). - Add
dist_prevanddist_nextto contain the distance betweenvalue2andprevandnextvalue respectively. (Note thatdist_prevfor each row will have the same value asdist_nextfor the previous row). - Find the minimum value for
dist_prevwithin each group, and add it asmincolumn (note, that the minimum value fordist_nextis the same by construction, so we only need one column here). - Filter the rows, selecting those that have the minimum value in either
dist_nextordist_prev. This finds the tightest pair unless there are multiple rows with the same distance from each other – this case was not accounted for in your question, so we don't know what kind of behavior you want in this case. This implementation will simply return all of these rows. - Finally, drop all extra columns that were added to the dataframe to return it to its original shape.
