I was able to modify the code and get the tuple in case of Success. How do I pass the tuple to the Failure part?
try { // //some processing if (!validationMessages.isEmpty) { (parsedJson.toString(), validationMessages.foreach(x => { val msg: String = x.getMessage msg }).toString()) } else { (parsedJson.toString(), "Good Record...") } } match { case Success(x) => { Right(x) } case Failure(err) => { Left(json) } } On Thu, Jan 6, 2022 at 1:43 PM Siddhesh Kalgaonkar < kalgaonkarsiddh...@gmail.com> wrote: > Thanks, Caizhi for your explanation. It helped me to understand where I > went wrong. > > On Thu, Jan 6, 2022 at 7:37 AM Caizhi Weng <tsreape...@gmail.com> wrote: > >> Hi! >> >> The last expression in your try block is >> >> if(validationMessages.isEmpty) { >> (parsedJson.toString(), >> validationMessages.foreach((msg=>msg.getMessage.toString))) >> } else { >> (parsedJson.toString(), "Format is correct...") >> } >> >> The first one produces a (String, Unit) type while the second one >> produces a (String, String) type, so the whole if expression produces >> (String, Any) type. However your parseJson should return Either[String, >> String], thus causing this issue. >> >> >> Siddhesh Kalgaonkar <kalgaonkarsiddh...@gmail.com> 于2022年1月5日周三 19:04写道: >> >>> I have written a process function where I am parsing the JSON and if it >>> is not according to the expected format it passes as Failure to the process >>> function and I print the records which are working fine. Now, I was trying >>> to print the message and the record in case of Success and Failure. I >>> implemented the below code and it gave me the error. What exactly I am >>> missing? >>> >>> package KafkaAsSource >>> >>> import com.fasterxml.jackson.databind.ObjectMapper >>> import com.networknt.schema.{JsonSchemaFactory, SpecVersion} >>> import org.apache.flink.api.scala.createTypeInformation >>> import org.apache.flink.streaming.api.functions.ProcessFunction >>> import org.apache.flink.streaming.api.scala.OutputTag >>> import org.apache.flink.util.Collector >>> import scala.jdk.CollectionConverters._ >>> import scala.util.{Failure, Success, Try} >>> >>> class JSONParsingProcessFunction extends ProcessFunction[String,String] { >>> val outputTag = new OutputTag[String]("failed") >>> >>> def parseJson(json: String): Either[String, String] = { >>> val schemaJsonString = >>> """ >>> { >>> "$schema": "http://json-schema.org/draft-04/schema#", >>> "title": "Product", >>> "description": "A product from the catalog", >>> "type": "object", >>> "properties": { >>> "id": { >>> "description": "The unique identifier for a product", >>> "type": "integer" >>> }, >>> "premium": { >>> "description": "Annual Premium", >>> "type": "integer" >>> }, >>> "eventTime": { >>> "description": "Timestamp at which record has arrived at source >>> / generated", >>> "type": "string" >>> } >>> }, >>> "required": ["id", "premium","eventTime"] >>> } >>> """ >>> Try { >>> val schema = >>> JsonSchemaFactory.getInstance(SpecVersion.VersionFlag.V4).getSchema(schemaJsonString) >>> // You can read a JSON object from String, a file, URL, etc. >>> val parsedJson = new ObjectMapper().readTree(json) >>> val validationMessages = schema.validate(parsedJson).asScala >>> //validationMessages.foreach(msg => println(msg.getMessage)) >>> require(validationMessages.isEmpty) >>> //parsedJson.toString() >>> if(validationMessages.isEmpty) >>> { >>> >>> (parsedJson.toString(),validationMessages.foreach((msg=>msg.getMessage.toString))) >>> } >>> else >>> { >>> (parsedJson.toString(),"Format is correct...") >>> } >>> >>> } >>> match { >>> case Success(x) => { >>> println("Good: " + x) >>> Right(x) >>> } >>> case Failure(err) => { >>> println("Bad: " + json) >>> Left(json) >>> } >>> } >>> } >>> override def processElement(i: String, context: ProcessFunction[String, >>> String]#Context, collector: Collector[String]): Unit = { >>> parseJson(i) match { >>> case Right(data) => { >>> collector.collect(data) >>> println("Good Records: " + data) >>> } >>> case Left(json) => { >>> context.output(outputTag, json) >>> println("Bad Records: " + json) >>> } >>> } >>> } >>> } >>> >>> >>> Error: >>> >>> type mismatch; >>> found : (String, Any) >>> required: String >>> Right(x) >>> >>>