Home > Net >  Filter a scala dataset on a Option[timestamp] column to return dates that are within n days of curre
Filter a scala dataset on a Option[timestamp] column to return dates that are within n days of curre

Time:02-06

Lets say I have the following dataset called customers

lastVisit id
2018-08-08 12:23:43.234 11
2021-12-08 14:13:45.4 12

And the lastVisit field is of type Option[Timestamp]

I want to be able to perform the following...

val filteredCustomers = customers.filter($"lastVisit" > current date - x days)

so that I return all the customers that have a lastVisit date within the last x days.

This is what I have tried so far.

val timeFilter: Timestamp => Long = input => {
   val sdf = new SimpleDateFormat("yyyy-mm-dd")
   val visitDate = sdf.parse(input.toString).toInstant.atZone(ZoneId.systemDefault()).toLocalDate
   val dateNow = LocalDate.now()
   ChronoUnit.DAYS.between(visitDate, dateNow)
}

val timeFilterUDF = udf(timeFilter)

val filteredCustomers = customers.withColumn("days", timeFilteredUDF(col("lastVisit")))
val filteredCustomers2 = filteredCustomers.filter($"days" < n)

This runs locally but when I submit it as a spark job to run on the full table I got a null pointer exception in the following line

   val visitDate = sdf.parse(input.toString).toInstant.atZone(ZoneId.systemDefault()).toLocalDate
   val dateNow = LocalDate.now()

The data looks good so I am unsure what the problem could be, I also imagine there is a much better way to implement the logic I am trying to do, any advice would be greatly appreciated! Thank you

CodePudding user response:

@Xaleate, Based on your query, seems like you want to achieve a logic of

current_date - lastVisits < x days

Did you try using the datediff UDF already available in spark? here is a two line solution using datediff

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object LastDateIssue {
  val spark: SparkSession = SparkSession.builder().appName("Last Date issue").master("local[*]").getOrCreate()

  def main(args: Array[String]): Unit = {
    import spark.implicits._
    //prepare customer data for test
    var customers = Map(
      "2018-08-08 12:23:43.234"-> 11,
      "2021-12-08 14:13:45.4"-> 12,
      "2022-02-01 14:13:45.4"-> 13)
      .toSeq
      .toDF("lastVisit", "id")
    // number of days
    val x: Int = 10
    
    customers = customers.filter(datediff(lit(current_date()), col("lastVisit")) < x)
    customers.show(20, truncate = false)
  }
}

This returns id = 13 as that is within the last 10 days (you could chose x accordingly)

 --------------------- --- 
|lastVisit            |id |
 --------------------- --- 
|2022-02-01 14:13:45.4|13 |
 --------------------- --- 

CodePudding user response:

Use date_sub function.

df.filter($"lastVisit" > date_sub(current_date(),n)).show(false)
  •  Tags:  
  • Related