Thanks, Caizhi for your explanation. It helped me to understand where I
went wrong.

On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> The last expression in your try block is
>
> if(validationMessages.isEmpty) {
>   (parsedJson.toString(),
> validationMessages.foreach((msg=>msg.getMessage.toString)))
> } else {
>   (parsedJson.toString(), "Format is correct...")
> }
>
> The first one produces a (String, Unit) type while the second one produces
> a (String, String) type, so the whole if expression produces (String, Any)
> type. However your parseJson should return Either[String, String], thus
> causing this issue.
>
>
> Siddhesh Kalgaonkar <kalgaonkarsiddh...@gmail.com> 于2022年1月5日周三 19:04写道:
>
>> I have written a process function where I am parsing the JSON and if it
>> is not according to the expected format it passes as Failure to the process
>> function and I print the records which are working fine. Now, I was trying
>> to print the message and the record in case of Success and Failure. I
>> implemented the below code and it gave me the error. What exactly I am
>> missing?
>>
>> package KafkaAsSource
>>
>> import com.fasterxml.jackson.databind.ObjectMapper
>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion}
>> import org.apache.flink.api.scala.createTypeInformation
>> import org.apache.flink.streaming.api.functions.ProcessFunction
>> import org.apache.flink.streaming.api.scala.OutputTag
>> import org.apache.flink.util.Collector
>> import scala.jdk.CollectionConverters._
>> import scala.util.{Failure, Success, Try}
>>
>> class JSONParsingProcessFunction extends ProcessFunction[String,String] {
>>   val outputTag = new OutputTag[String]("failed")
>>
>>   def parseJson(json: String): Either[String, String] = {
>>     val schemaJsonString =
>>       """
>> {
>>     "$schema": "http://json-schema.org/draft-04/schema#";,
>>     "title": "Product",
>>     "description": "A product from the catalog",
>>     "type": "object",
>>     "properties": {
>>         "id": {
>>             "description": "The unique identifier for a product",
>>             "type": "integer"
>>         },
>>         "premium": {
>>             "description": "Annual Premium",
>>             "type": "integer"
>>         },
>>         "eventTime": {
>>             "description": "Timestamp at which record has arrived at source 
>> / generated",
>>             "type": "string"
>>         }
>>     },
>>     "required": ["id", "premium","eventTime"]
>> }
>> """
>>     Try {
>>       val schema = 
>> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString)
>>       // You can read a JSON object from String, a file, URL, etc.
>>       val parsedJson = new ObjectMapper().readTree(json)
>>       val validationMessages = schema.validate(parsedJson).asScala
>>       //validationMessages.foreach(msg => println(msg.getMessage))
>>       require(validationMessages.isEmpty)
>>       //parsedJson.toString()
>>       if(validationMessages.isEmpty)
>>         {
>>           
>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString)))
>>         }
>>       else
>>         {
>>           (parsedJson.toString(),"Format is correct...")
>>         }
>>
>>     }
>>     match {
>>       case Success(x) => {
>>         println("Good: " + x)
>>         Right(x)
>>       }
>>       case Failure(err) => {
>>         println("Bad:  " + json)
>>         Left(json)
>>       }
>>     }
>>   }
>>   override def processElement(i: String, context: ProcessFunction[String, 
>> String]#Context, collector: Collector[String]): Unit = {
>>     parseJson(i) match {
>>       case Right(data) => {
>>         collector.collect(data)
>>         println("Good Records: " + data)
>>       }
>>       case Left(json) => {
>>         context.output(outputTag, json)
>>         println("Bad Records: " + json)
>>       }
>>     }
>>   }
>> }
>>
>>
>> Error:
>>
>> type mismatch;
>>  found   : (String, Any)
>>  required: String
>>         Right(x)
>>
>>

Reply via email to