Home > Blockchain >  Pyspark label values based on time window and double condition match
Pyspark label values based on time window and double condition match

Time:02-01

The idea is to identify users who bought an item offline ('no' in the 'online' column) and tried to buy the same item online ('yes' in the 'online' column) in the last 7 days.

This means that for each 'no' in the 'online' column I need to search through the 'yes' values 7 days prior, which also match the 'user_ID' and 'item' values of the 'no' instance and then label matches as '1' and non-matches as '0'.

Sample dataset (notes on the right):

 ------- ---- ------ -------------- 
|user_ID|item|online|      datetime|
 ------- ---- ------ -------------- 
|      1|  66|   yes| 02-10-21 9:30|
|      1|  24|   yes| 02-10-21 9:30|
|      1|  66|    no| 06-10-21 9:30| <- 1 ('user_ID' and 'item' matched in the last 7 days)
|      2|  24|    no| 03-10-21 8:01| <- 0 ('user_ID' didnt match in the last 7 days)
|      2|  24|   yes| 03-10-21 8:02|
|      2|  24|    no| 03-10-21 8:03| <- 1 ('user_ID' and 'item' matched in the last 7 days)
|      3|  31|   yes| 05-10-21 8:32|
|      3|  66|   yes|10-10-21 10:57|
|      3|  66|    no|14-10-21 12:07| <- 1 ('user_ID' and 'item' matched in the last 7 days)
|      3|  31|    no|15-10-21 12:07| <- 0 ('item' didnt match in the last 7 days)
 ------- ---- ------ -------------- 

Resulting dataset:

 ------- ---- ------ -------------- ------ 
|user_ID|item|online|      datetime|result|
 ------- ---- ------ -------------- ------ 
|      1|  66|    no| 06-10-21 9:30|     1|
|      2|  24|    no| 03-10-21 8:01|     0|
|      2|  24|    no| 03-10-21 8:03|     1|
|      3|  66|    no|14-10-21 12:07|     1|
|      3|  31|    no|15-10-21 12:07|     0|
 ------- ---- ------ -------------- ------ 

Can't figure this one out. Is there a way of doing this with a windowed function? I'm working with spark and a large dataset (>10 mln records).

Any pyspark or sql magicians that could help?

CodePudding user response:

I've included the pandas code to easily build out a sample dataframe. The spark code starts after that.

It's based upon groupby and shifting the online and datetime values. The idea being that the previous value to a no for any given id/item needs to be within 7 days to count.

import pandas as pd

from pyspark.sql.functions import lag, col, datediff, when
from pyspark.sql.window import Window

df = pd.DataFrame({'user_ID':[1,1,1,2,2,2,3,3,3,3],
                  'item':[66,24,66,24,24,24,31,66,66,31],
                   'online':['yes','yes','no','no','yes','no','yes','yes','no','no'],
                  'datetime':['02-10-21 9:30',
'02-10-21 9:30',
'06-10-21 9:30',
'03-10-21 8:01',
'03-10-21 8:02',
'03-10-21 8:03',
'05-10-21 8:32',
'10-10-21 10:57',
'14-10-21 12:07',
'15-10-21 12:07']})


df['datetime']  = pd.to_datetime(df['datetime'], dayfirst=True)


df = spark.createDataFrame(df)


df = df.withColumn('o_shift', lag(df['online']).over(Window.partitionBy('user_ID','item').orderBy('datetime')))
df = df.withColumn('d_shift', lag(df['datetime']).over(Window.partitionBy('user_ID','item').orderBy('datetime')))
df = df.dropna()

df = df.withColumn('t_delta', datediff(col('datetime'),col('d_shift')))


df = df.withColumn('result',when(
  (col('online')=='no') &
  (col('o_shift')=='yes') & 
  (col('t_delta')<=7),1
).otherwise(0))

df.sort('user_id','datetime').select('user_id','item','online','datetime','result').show()

Output

 ------- ---- ------ ------------------- ------ 
|user_id|item|online|           datetime|result|
 ------- ---- ------ ------------------- ------ 
|      1|  66|    no|2021-10-06 09:30:00|     1|
|      2|  24|   yes|2021-10-03 08:02:00|     0|
|      2|  24|    no|2021-10-03 08:03:00|     1|
|      3|  66|    no|2021-10-14 12:07:00|     1|
|      3|  31|    no|2021-10-15 12:07:00|     0|
 ------- ---- ------ ------------------- ------ 

CodePudding user response:

You can use rangeBetween window function to look up last 7 days.

First cast datetime to long value to be able to used with rangeBetween.

# If the datetime is string
df = df.withColumn('datetime', F.to_timestamp(df.datetime, 'dd-MM-yy H:mm').cast(LongType()))

# If the datetime is timestamp type
df = df.withColumn('datetime', df.datetime.cast(LongType()))

Then create window function to look up past 7 days with for the same user_ID and item.

w = (Window.partitionBy(['user_ID', 'item'])
    .orderBy('datetime')
    .rangeBetween(-7*24*60*60, -1)) # -7 days to -1 second before.

Use this window function to count the row and if you see for the second time (> 0), you want to return 1. You also want to return only when online == no, so add the filter at the end.

df = (df.withColumn('result', (F.count(df.user_ID).over(w) > 0).cast(IntegerType()))
     .filter(df.online == 'no'))
  •  Tags:  
  • Related