I was able to modify the code and get the tuple in case of Success. How do
I pass the tuple to the Failure part?

  //some processing

if (!validationMessages.isEmpty) {
        (parsedJson.toString(), validationMessages.foreach(x => {
          val msg: String = x.getMessage
      else {
        (parsedJson.toString(), "Good Record...")

    match {
      case Success(x) => {
      case Failure(err) => {

> Thanks, Caizhi for your explanation. It helped me to understand where I
> went wrong.
>> Hi!
>> The last expression in your try block is
>> if(validationMessages.isEmpty) {
>>   (parsedJson.toString(),
>> validationMessages.foreach((msg=>msg.getMessage.toString)))
>> } else {
>>   (parsedJson.toString(), "Format is correct...")
>> }
>> The first one produces a (String, Unit) type while the second one
>> produces a (String, String) type, so the whole if expression produces
>> (String, Any) type. However your parseJson should return Either[String,
>> String], thus causing this issue.
>>> I have written a process function where I am parsing the JSON and if it
>>> is not according to the expected format it passes as Failure to the process
>>> function and I print the records which are working fine. Now, I was trying
>>> to print the message and the record in case of Success and Failure. I
>>> implemented the below code and it gave me the error. What exactly I am
>>> missing?
>>> package KafkaAsSource
>>> import com.fasterxml.jackson.databind.ObjectMapper
>>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>>> import org.apache.flink.api.scala.createTypeInformation
>>> import org.apache.flink.streaming.api.functions.ProcessFunction
>>> import org.apache.flink.streaming.api.scala.OutputTag
>>> import org.apache.flink.util.Collector
>>> import scala.jdk.CollectionConverters._
>>> import scala.util.{Failure, Success, Try}
>>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>>   val outputTag = new OutputTag[String]("failed")
>>>   def parseJson(json: String): Either[String, String] = {
>>>     val schemaJsonString =
>>>       """
>>> {
>>>     "$schema": "http://json-schema.org/draft-04/schema#";,
>>>     "title": "Product",
>>>     "description": "A product from the catalog",
>>>     "type": "object",
>>>     "properties": {
>>>         "id": {
>>>             "description": "The unique identifier for a product",
>>>             "type": "integer"
>>>         },
>>>         "premium": {
>>>             "description": "Annual Premium",
>>>             "type": "integer"
>>>         },
>>>         "eventTime": {
>>>             "description": "Timestamp at which record has arrived at source 
>>> / generated",
>>>             "type": "string"
>>>         }
>>>     },
>>>     "required": ["id", "premium","eventTime"]
>>> }
>>> """
>>>     Try {
>>>       val schema = 
>>> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>>       // You can read a JSON object from String, a file, URL, etc.
>>>       val parsedJson = new ObjectMapper().readTree(json)
>>>       val validationMessages = schema.validate(parsedJson).asScala
>>>       //validationMessages.foreach(msg => println(msg.getMessage))
>>>       require(validationMessages.isEmpty)
>>>       //parsedJson.toString()
>>>       if(validationMessages.isEmpty)
>>>         {
>>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>>>         }
>>>       else
>>>         {
>>>           (parsedJson.toString(),"Format is correct...")
>>>         }
>>>     }
>>>     match {
>>>       case Success(x) => {
>>>         println("Good: " + x)
>>>         Right(x)
>>>       }
>>>       case Failure(err) => {
>>>         println("Bad:  " + json)
>>>         Left(json)
>>>       }
>>>     }
>>>   }
>>>   override def processElement(i: String, context: ProcessFunction[String, 
>>> String]#Context, collector: Collector[String]): Unit = {
>>>     parseJson(i) match {
>>>       case Right(data) => {
>>>         collector.collect(data)
>>>         println("Good Records: " + data)
>>>       }
>>>       case Left(json) => {
>>>         context.output(outputTag, json)
>>>         println("Bad Records: " + json)
>>>       }
>>>     }
>>>   }
>>> }
>>> Error:
>>> type mismatch;
>>>  found   : (String, Any)
>>>  required: String
>>>         Right(x)

