Lets say I have the following dataset called customers
| lastVisit | id |
|---|---|
| 2018-08-08 12:23:43.234 | 11 |
| 2021-12-08 14:13:45.4 | 12 |
And the lastVisit field is of type Option[Timestamp]
I want to be able to perform the following...
val filteredCustomers = customers.filter($"lastVisit" > current date - x days)
so that I return all the customers that have a lastVisit date within the last x days.
This is what I have tried so far.
val timeFilter: Timestamp => Long = input => {
val sdf = new SimpleDateFormat("yyyy-mm-dd")
val visitDate = sdf.parse(input.toString).toInstant.atZone(ZoneId.systemDefault()).toLocalDate
val dateNow = LocalDate.now()
ChronoUnit.DAYS.between(visitDate, dateNow)
}
val timeFilterUDF = udf(timeFilter)
val filteredCustomers = customers.withColumn("days", timeFilteredUDF(col("lastVisit")))
val filteredCustomers2 = filteredCustomers.filter($"days" < n)
This runs locally but when I submit it as a spark job to run on the full table I got a null pointer exception in the following line
val visitDate = sdf.parse(input.toString).toInstant.atZone(ZoneId.systemDefault()).toLocalDate
val dateNow = LocalDate.now()
The data looks good so I am unsure what the problem could be, I also imagine there is a much better way to implement the logic I am trying to do, any advice would be greatly appreciated! Thank you
CodePudding user response:
@Xaleate, Based on your query, seems like you want to achieve a logic of
current_date - lastVisits < x days
Did you try using the datediff UDF already available in spark? here is a two line solution using datediff
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object LastDateIssue {
val spark: SparkSession = SparkSession.builder().appName("Last Date issue").master("local[*]").getOrCreate()
def main(args: Array[String]): Unit = {
import spark.implicits._
//prepare customer data for test
var customers = Map(
"2018-08-08 12:23:43.234"-> 11,
"2021-12-08 14:13:45.4"-> 12,
"2022-02-01 14:13:45.4"-> 13)
.toSeq
.toDF("lastVisit", "id")
// number of days
val x: Int = 10
customers = customers.filter(datediff(lit(current_date()), col("lastVisit")) < x)
customers.show(20, truncate = false)
}
}
This returns id = 13 as that is within the last 10 days (you could chose x accordingly)
--------------------- ---
|lastVisit |id |
--------------------- ---
|2022-02-01 14:13:45.4|13 |
--------------------- ---
CodePudding user response:
Use date_sub function.
df.filter($"lastVisit" > date_sub(current_date(),n)).show(false)
