Home > Software engineering >  How to pass data in case of success and failure in Scala
How to pass data in case of success and failure in Scala

Time:01-05

I have developed a code where I am parsing the incoming JSON data with the predefined format. So, it is working as expected. Now my aim is to send the data to the respective methods Right and Left which are used by another Process Function where I am making DB calls.

   package KafkaAsSource

import KafkaAsSource.JSONParsingExample.{sampleJsonString, schemaJsonString}
import com.fasterxml.jackson.databind.{DeserializationFeature, MapperFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.OutputTag
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

case class Premium(id: String, premium: Long, eventTime: String)

class Splitter extends ProcessFunction[String,Premium] {
  val outputTag = new OutputTag[String]("failed")

  def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
    Try {
      val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
      // You can read a JSON object from String, a file, URL, etc.
      val parsedJson = new ObjectMapper().readTree(sampleJsonString)
      val validationMessages = schema.validate(parsedJson).asScala
      validationMessages.foreach(msg => println(msg.getMessage))
    } match {
      case Success(x) => {
        println(" Good: "   x)
        Right(x)
      }
      case Failure(err) => {
        println("Bad:  "   json)
        Left(json)
      }
    }
  }

  override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
    fromJson(i) match {
      case Right(data) => {
        collector.collect(data)
        println("Good Records: "   data)
      }
      case Left(json) => {
        context.output(outputTag, json)
        println("Bad Records: "   json)
      }
    }
  }
}

This gives me errors like:

type mismatch;
 found   : x.type (with underlying type Unit)
 required: T
        Right(x)  

In this case, whether the data is correct or not, the Success part is always called. Where am I going wrong?

CodePudding user response:

Try will only consider something a failure if it throws an exception. If the block passed to Try doesn't throw, it's by definition a Success.

So assuming that your schema.validate(parsedJson) doesn't throw exceptions but instead returns a collection of validation failures, there are a couple of approaches:

  • You can stay in Try and throw if there's a failure. One easy way to accomplish this is with require: though it's typically used for expressing preconditions, it's a quick and readable way to throw
Try {
  // as before
  val validationMessages = schema.validate(parsedJson).asScala

  require(validationMessages.isEmpty)
  parsedJson
} match {
  // as before
}
  • You can dispense with Try (especially if you're sure that none of the other code in that block will throw exceptions) and just go straight to Either, with something like:
val schema = JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
// You can read a JSON object from String, a file, URL, etc.
val parsedJson = new ObjectMapper().readTree(sampleJsonString)
val validationMessages = schema.validate(parsedJson).asScala

if (validationMessages.isEmpty) Right(parsedJson)
else Left(json)

EDIT: note that in your edited code, where you've added

validationMessages.foreach(msg => println(msg.getMessage))

replacing the parsedJson that you had earlier, that changed your Try from a Try[T] to a Try[Unit] which became an Either[String, Unit] after the match.

  •  Tags:  
  • Related