The following is blowing spark with Task not serializable.
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
mergePayloads(rows, Some(schemaForDataValidation.value))
)
But without passing the option it is working fine:
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
mergePayloads(rows)
)
Where schemaForDataValidation is a broadcasted Map (tried without broadcasting - yields the same error):
lazy val schemaForDataValidation: Broadcast[Map[String, Map[String, Any]]] = getSchemaForValidation
And mergePayloads has the following signature placed in another Object (that extends Serializable) :
object UpdateTableMethods extends Logging with Serializable {
def mergePayloads(iterator: Iterator[Row], schemaOpt: Option[Map[String, Map[String, Any]]] = None): Iterator[String]
I checked the Option class source code. Some is a case class - therefore is serializable and Option itself extends Serializable.
Actually, I have also tried not passing the parameter as an option, but a Map that can be empty/null.
Appreciate your help.
Thank you all.
CodePudding user response:
The solution to this problem: inject the variable in a serizable class with the method that uses it.
val merger = PayloadsMerger(schemaForDataValidationBroadcast.value)
val mergedDF: Dataset[String] = readyToMergeDF
.mapPartitions((rows: Iterator[Row]) =>
merger.merge(rows)
)
Where PayloadsMerger carries the variable and method:
case class PayloadsMerger(expectedSchema: Option[Map[String, Map[String, Any]]]) {
def merge(iterator: Iterator[Row]): Iterator[String] = {
PayloadsMerger.mergePayloads(iterator, expectedSchema)
}
}
Using this kind of clousure technique enables serialization, since scala case classes mixin serializable trait.
