I have developed a code where I am parsing the incoming JSON data with the predefined format. So, it is working as expected. Now my aim is to send the data to the respective methods Right and Left which are used by another Process Function where I am making DB calls.
package KafkaAsSource
import KafkaAsSource.JSONParsingExample.{sampleJsonString, schemaJsonString}
import com.fasterxml.jackson.databind.{DeserializationFeature, MapperFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.OutputTag
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
case class Premium(id: String, premium: Long, eventTime: String)
class Splitter extends ProcessFunction[String,Premium] {
val outputTag = new OutputTag[String]("failed")
def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
Try {
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
validationMessages.foreach(msg => println(msg.getMessage))
} match {
case Success(x) => {
println(" Good: " x)
Right(x)
}
case Failure(err) => {
println("Bad: " json)
Left(json)
}
}
}
override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
fromJson(i) match {
case Right(data) => {
collector.collect(data)
println("Good Records: " data)
}
case Left(json) => {
context.output(outputTag, json)
println("Bad Records: " json)
}
}
}
}
This gives me errors like:
type mismatch;
found : x.type (with underlying type Unit)
required: T
Right(x)
In this case, whether the data is correct or not, the Success part is always called. Where am I going wrong?
CodePudding user response:
Try will only consider something a failure if it throws an exception. If the block passed to Try doesn't throw, it's by definition a Success.
So assuming that your schema.validate(parsedJson) doesn't throw exceptions but instead returns a collection of validation failures, there are a couple of approaches:
- You can stay in
Tryand throw if there's a failure. One easy way to accomplish this is withrequire: though it's typically used for expressing preconditions, it's a quick and readable way to throw
Try {
// as before
val validationMessages = schema.validate(parsedJson).asScala
require(validationMessages.isEmpty)
parsedJson
} match {
// as before
}
- You can dispense with
Try(especially if you're sure that none of the other code in that block will throw exceptions) and just go straight toEither, with something like:
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala
if (validationMessages.isEmpty) Right(parsedJson)
else Left(json)
EDIT: note that in your edited code, where you've added
validationMessages.foreach(msg => println(msg.getMessage))
replacing the parsedJson that you had earlier, that changed your Try from a Try[T] to a Try[Unit] which became an Either[String, Unit] after the match.
