I am trying to perform a kind of grouping with users who have used the same IP in a overlapping range of dates.
This will let me know if two users share the same house because they have the same IP at the same time.
Also, I've been trying to implement it, but I can't find a way to do it with PySpark SQL. In fact, I think it can't be done with PySpark, and probably requires some other graph-oriented library.
The problem is the following:
| ip | user | start_date | end_date |
| ----------- | ---------- | ---------- | ---------- |
| 192.168.1.1 | a | 2022-01-01 | 2022-01-03 |
| 192.168.1.1 | a | 2022-01-05 | 2022-01-07 |
| 192.168.1.1 | b | 2022-01-06 | 2022-01-09 |
| 192.168.1.1 | c | 2022-01-08 | 2022-01-11 |
| 192.168.1.2 | d | 2022-01-08 | 2022-01-11 |
| 192.168.1.2 | e | 2022-01-10 | 2022-01-11 |
| 192.168.1.2 | f | 2022-01-16 | 2022-01-18 |
As we can see:
- the users
a,boverlaps in range and sameip. - the users
b,coverlaps in range and sameip. - indirectly the users
aandcare in the same group. - the users
d,eoverlaps in range and sameip. - the user
fnot overlap with respect other user.
Expected output:
| ip | users | date_ranges
| ----------- | ----------- | ------------------- | ------------------- |
| 192.168.1.1 | {a, b, c} | {2022-01-01 - 2022-01-03, 2022-01-05 - 2022-01-07, 2022-01-06 - 2022-01-09, 2022-01-08 - 2022-01-11} |
| 192.168.1.2 | {d, e} | {2022-01-08 - 2022-01-11, 2022-01-10-2022-01-11} |
| 192.168.1.1 | {f} | {2022-01-16 - 2022-01-18} |
Do you have any ideas on how to implement this?
I thought about using GraphFrames, but I don't even know where to start :S
CodePudding user response:
One way to identify overlapping date intervals it to use a cumulative conditional sum over a window partitioned by ip and ordered by start_date. For each row in a frame, if the start_date is greater than max(end_date) before the current row then it doesn't overlaps (i.e. it's a new group):
from pyspark.sql import functions as F, Window
w = Window.partitionBy('ip').orderBy('start_date')
df1 = df.withColumn(
"previous_end", F.max("end_date").over(w)
).withColumn(
"group",
F.sum(F.when(F.lag("previous_end").over(w) < F.col("start_date"), 1).otherwise(0)).over(w)
).groupBy("ip", "group").agg(
F.collect_list(
F.struct("user", F.struct("start_date", "end_date").alias("date_ranges"))
).alias("sessions")
).select(
"ip", "sessions.user", "sessions.date_ranges"
)
df1.show(truncate=False)
# ----------- --------- ------------------------------------------------------------------------------
#|ip |user |date_ranges |
# ----------- --------- ------------------------------------------------------------------------------
#|192.168.1.1|[a] |[{2022-01-01, 2022-01-03}] |
#|192.168.1.1|[a, b, c]|[{2022-01-05, 2022-01-07}, {2022-01-06, 2022-01-09}, {2022-01-08, 2022-01-11}]|
#|192.168.1.2|[d, e] |[{2022-01-08, 2022-01-11}, {2022-01-10, 2022-01-11}] |
#|192.168.1.2|[f] |[{2022-01-16, 2022-01-18}] |
# ----------- --------- ------------------------------------------------------------------------------
CodePudding user response:
First of all, let's create your test case:
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('UserLogsAggregation').getOrCreate()
sc = spark.sparkContext
schema = StructType([
StructField("ip", StringType()),
StructField("user", StringType()),
StructField("start_date", StringType()),
StructField("end_date", StringType()),
])
data = [('192.168.1.1', 'a', '2022-01-01','2022-01-03'),
('192.168.1.1', 'a', '2022-01-05','2022-01-07'),
('192.168.1.1', 'b', '2022-01-06','2022-01-09'),
('192.168.1.1', 'c', '2022-01-08','2022-01-11'),
('192.168.1.2', 'd', '2022-01-08','2022-01-11'),
('192.168.1.2', 'e', '2022-01-10','2022-01-11'),
('192.168.1.2', 'f', '2022-01-16','2022-01-18')]
rdd = sc.parallelize(data)
user_log = spark.createDataFrame(rdd,schema)
Now, let's create a temp view. The temp view allows us to process the dataframe by using SQL.
user_log.createOrReplaceTempView('user_log')
It is time to use a built-in function ( see https://spark.apache.org/docs/latest/sql-ref-functions-builtin.html )
spark.sql(
"""select
ip,
collect_set(user) as users,
collect_set((start_date,end_date)) as date_ranges
from user_log
group by ip"""
).show()
The last step is to "explode" the non-overlapping time intervals. It is left as exercise to the reader.
