Thanks, Caizhi for your explanation. It helped me to understand where I went wrong.
On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > The last expression in your try block is > > if(validationMessages.isEmpty) { > (parsedJson.toString(), > validationMessages.foreach((msg=>msg.getMessage.toString))) > } else { > (parsedJson.toString(), "Format is correct...") > } > > The first one produces a (String, Unit) type while the second one produces > a (String, String) type, so the whole if expression produces (String, Any) > type. However your parseJson should return Either[String, String], thus > causing this issue. > > > Siddhesh Kalgaonkar <kalgaonkarsiddh...@gmail.com> 于2022年1月5日周三 19:04写道: > >> I have written a process function where I am parsing the JSON and if it >> is not according to the expected format it passes as Failure to the process >> function and I print the records which are working fine. Now, I was trying >> to print the message and the record in case of Success and Failure. I >> implemented the below code and it gave me the error. What exactly I am >> missing? >> >> package KafkaAsSource >> >> import com.fasterxml.jackson.databind.ObjectMapper >> import com.networknt.schema.{JsonSchemaFactory, SpecVersion} >> import org.apache.flink.api.scala.createTypeInformation >> import org.apache.flink.streaming.api.functions.ProcessFunction >> import org.apache.flink.streaming.api.scala.OutputTag >> import org.apache.flink.util.Collector >> import scala.jdk.CollectionConverters._ >> import scala.util.{Failure, Success, Try} >> >> class JSONParsingProcessFunction extends ProcessFunction[String,String] { >> val outputTag = new OutputTag[String]("failed") >> >> def parseJson(json: String): Either[String, String] = { >> val schemaJsonString = >> """ >> { >> "$schema": "http://json-schema.org/draft-04/schema#", >> "title": "Product", >> "description": "A product from the catalog", >> "type": "object", >> "properties": { >> "id": { >> "description": "The unique identifier for a product", >> "type": "integer" >> }, >> "premium": { >> "description": "Annual Premium", >> "type": "integer" >> }, >> "eventTime": { >> "description": "Timestamp at which record has arrived at source >> / generated", >> "type": "string" >> } >> }, >> "required": ["id", "premium","eventTime"] >> } >> """ >> Try { >> val schema = >> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString) >> // You can read a JSON object from String, a file, URL, etc. >> val parsedJson = new ObjectMapper().readTree(json) >> val validationMessages = schema.validate(parsedJson).asScala >> //validationMessages.foreach(msg => println(msg.getMessage)) >> require(validationMessages.isEmpty) >> //parsedJson.toString() >> if(validationMessages.isEmpty) >> { >> >> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString))) >> } >> else >> { >> (parsedJson.toString(),"Format is correct...") >> } >> >> } >> match { >> case Success(x) => { >> println("Good: " + x) >> Right(x) >> } >> case Failure(err) => { >> println("Bad: " + json) >> Left(json) >> } >> } >> } >> override def processElement(i: String, context: ProcessFunction[String, >> String]#Context, collector: Collector[String]): Unit = { >> parseJson(i) match { >> case Right(data) => { >> collector.collect(data) >> println("Good Records: " + data) >> } >> case Left(json) => { >> context.output(outputTag, json) >> println("Bad Records: " + json) >> } >> } >> } >> } >> >> >> Error: >> >> type mismatch; >> found : (String, Any) >> required: String >> Right(x) >> >>