Home > Software design >  What is the best way to build event counts for certain time resolution over multiple names in Spark
What is the best way to build event counts for certain time resolution over multiple names in Spark

Time:02-02

Let's say I have the following Spark frame:

 ------------------- -------- 
|timestamp          |UserName|
 ------------------- -------- 
|2021-08-11 04:05:06|A       |
|2021-08-11 04:15:06|B       |
|2021-08-11 09:15:26|A       |
|2021-08-11 11:04:06|B       |
|2021-08-11 14:55:16|A       |
|2021-08-13 04:12:11|B       |
 ------------------- -------- 

I want to build time-series data for desired time resolution based on events counts for each user.

  • Note1: obliviously after groupbying on UserName & counting based on desired time frame\resolution, time frames need to be kept with spark frame. (maybe use of Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming )
  • Note2: needs to fill the missing gap for a specific time frame and replace 0 if there are no events.
  • Note3: I'm not interested in using UDF or hacking it via toPandas().

So let's say for 24hrs (daily) time frame expected results should be like below after groupBy:

 ------------------------------------------ ------------- ------------- 
|window_frame_24_Hours                     | username A  | username B  |
 ------------------------------------------ ------------- ------------- 
|{2021-08-11 00:00:00, 2021-08-11 23:59:59}|3            |2            |
|{2021-08-12 00:00:00, 2021-08-12 23:59:59}|0            |0            |
|{2021-08-13 00:00:00, 2021-08-13 23:59:59}|0            |1            |
 ------------------------------------------ ------------- ------------- 

Edit1: in case of 12hrs time frame\resolution:

 ------------------------------------------ ------------- ------------- 
|window_frame_12_Hours                     | username A  | username B  |
 ------------------------------------------ ------------- ------------- 
|{2021-08-11 00:00:00, 2021-08-11 11:59:59}|2            |2            |
|{2021-08-11 12:00:00, 2021-08-11 23:59:59}|1            |0            |
|{2021-08-12 00:00:00, 2021-08-12 11:59:59}|0            |0            |
|{2021-08-12 12:00:00, 2021-08-12 23:59:59}|0            |0            |
|{2021-08-13 00:00:00, 2021-08-13 11:59:59}|0            |1            |
|{2021-08-13 12:00:00, 2021-08-13 23:59:59}|0            |0            |
 ------------------------------------------ ------------- ------------- 

CodePudding user response:

Group by time window '1 day' UserName to count then group by window frame and pivot user names:

from pyspark.sql import functions as F

result = df.groupBy(
    F.window("timestamp", "1 day").alias("window_frame_24_Hours"),
    "UserName"
).count().groupBy("window_frame_24_Hours").pivot("UserName").agg(
   F.first("count")
).na.fill(0)

result.show(truncate=False)

# ------------------------------------------ --- --- 
#|window_frame_24_Hours                     |A  |B  |
# ------------------------------------------ --- --- 
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0  |1  |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3  |2  |
# ------------------------------------------ --- --- 

If you need the missing dates, you'll have to generate all dates using sequence on min and max timestamp then join with original dataframe:

intervals_df = df.withColumn(
    "timestamp",
    F.date_trunc("day", "timestamp")
).selectExpr(
    "sequence(min(timestamp), max(timestamp   interval 1 day), interval 1 day) as dates"
).select(
    F.explode(
        F.expr("transform(dates, (x, i) -> IF(i!=0, struct(date_trunc('dd', dates[i-1]) as start, dates[i] as end), null))")
    ).alias("frame")
).filter("frame is not null").crossJoin(
    df.select("UserName").distinct()
)

result = intervals_df.alias("a").join(
    df.alias("b"),
    F.col("timestamp").between(F.col("frame.start"), F.col("frame.end"))
    & (F.col("a.UserName") == F.col("b.UserName")),
    "left"
).groupBy(
    F.col("frame").alias("window_frame_24_Hours")
).pivot("a.UserName").agg(
    F.count("b.UserName")
)

result.show(truncate=False)

# ------------------------------------------ ---------- ---------- 
#|window_frame_24_Hours                     |username_A|username_B|
# ------------------------------------------ ---------- ---------- 
#|{2021-08-13 00:00:00, 2021-08-14 00:00:00}|0         |1         |
#|{2021-08-11 00:00:00, 2021-08-12 00:00:00}|3         |2         |
#|{2021-08-12 00:00:00, 2021-08-13 00:00:00}|0         |0         |
# ------------------------------------------ ---------- ---------- 
  •  Tags:  
  • Related