Hello, I am using Apache Spark 2.2.1 with Scala. I am trying to load below JSON from Kafka and trying to extract "JOBTYPE" and "LOADID" from the nested JSON object. Need help with extraction logic.
Code val workRequests = new StructType().add("after", new StructType() .add("JOBTYPE", StringType) .add("LOADID", StringType)) val dfn = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "zlp21299.vci.att.com:29092,zlp21301.vci.att.com:29093,zlp21318.vci.att.com:29094") .option("subscribe", "edgemw") .load() val df2 = dfn.selectExpr("CAST(value AS STRING)").as[(String)] .select(from_json($"value", workRequests)).as("data") .select("data.*") df2.writeStream .format("console") .option("truncate","false") .option("numRows", 10) .start() .awaitTermination() Desired Output JOBTYPE | LOADID ---------------------- Val 1 | Val 1 Val 2 | Val 2 JSON {"table":"FORCEMW.XBFWR_T","op_type":"U","op_ts":"2018-04-03 15:10:35.036322","current_ts":"2018-04-03T15:10:39.269000","pos":"00000296150084924838","before":{},"after":{"WRID":93694148,"LOADID":"OH_OHIO_I","SOURCETYPE":"RWG","EXTKEY1":"S0010","WORKTYPE":"RT","WORKTYPEQUALIFIER":"IN","TIMESTAMP":"2018-04-03:12:41:49","EXTKEY2":"NWCMOH49 S0010 00130 ","WRSTAT":"ASSIGN","WRSTATQUAL":null,"JOBTYPE":"S0010","ESTDURATION":"28","ESTSETUP":"0","PRIORITY":"55","LOADSTARTDATETIME":"2018-04-01:11:00:00","LOADENDDATETIME":"2018-04-11:03:59:00","ASSIGNDATE":"2018-04-03:00:00:00","COMMITMENTDATETIME":"2018-05-01:03:59:00","LATITUDE":40.273643,"LONGITUDE":-81.607832,"ASSIGNCOORDHORIZ":null,"ASSIGNCOORDVERT":null,"ACCESSBEGINDATETIME":null,"ACCESSENDDATETIME":null,"DISPATCH2AREA":"N","LATESTSTARTDATETIME":null,"TECHPREFERRED":null,"OWNERID":"Force","TECHASSIGNED":"MM4341","UNSCHEDULED":"N","UNLOCATED":"N","ACCESSCODE":null,"COMPCANDATETIME":null,"APPOINTMENTDATETIME":null,"CLOSEOUTVALIDATIONRQD":null,"CUSTADVISEDNAMERQD":null,"CUSTADVISEDREACHNBRRQD":null,"DISPATCHDATE":null,"DISPATCHDATECLASS":null,"DMNAENDDATETIME1":null,"DMNAENDDATETIME2":null,"DMNAFLAG1":null,"DMNAFLAG2":null,"DMNASTARTDATETIME1":null,"DMNASTARTDATETIME2":null,"ESTORIGDURATION":"28","MANUALLOCKDATETIME":null,"MANUALLOCKEDFLAG":"N","MISSINGDESIGNDATE":null,"PCTCOMPLETE":0,"TASKDURATIONAREANAME":null,"TRAVELTIMEAREANAME":null,"SUMMARYRQD":null,"TRAVELDATETIMEFACTOR":0,"UPDATEDWHILEDISPATCHED":"N","VISITFLAG":"Y","EXTDESTINATION":null,"LOCQUALIFIER":"001","ROUTETOCODE":null,"AUXSTATUS":null,"CURRENTSOURCESTATUS":null,"LASTDISPATCHSOURCESTATUS":null,"LASTSOURCESTATUSDATETIME":null,"APPOINTMENTWINDOWEND":null,"VERSIONNBR":"9","SEARCHDATE":null,"AUXSTATQUAL":null,"EXCEEDPRIORITYTHRES":"N","CANCELORDER":null,"CIRCUITWORKLOCCLLI":"NWCMOH49","JOBTYPEREQUIRESAPPROVAL":"N","APPROVEDFORDISPATCH":null,"WIT":"W","MINIMUMASSIGNTIME":null,"RTNTYPE":"PM","RTNBEGINTIME":null,"RTNENDTIME":null,"ASSIGNABLEFLAG":"Y","DUEDATEFLAG":"N","GROUPKEY":null,"PRIORITIZATIONAREA":null,"ORDDUEDATE":null,"SAMELOCATIONINDICATOR":"Y","ESTIMATEDSTARTDATETIME":"2018-04-03:18:40:45","ESTIMATEDCOMPLETIONDATETIME":"2018-04-03:18:54:45","OBJECTTIMEZONE":"EASTERN","LINKKEY":null,"CUSTNAME":null,"SERVICEADDRSTRING":null,"RESERVATIONID":null,"RESERVATIONSTATQUAL":null,"RSVTECHASSIGNED":null,"RSVASSIGNDATE":null,"RSVMATCHFLAG":null,"RSVCNTFLAG":null,"RSVOVRBKFLAG":"N","RSVDETAILS":null,"REISSUEFLAG":"Y","OOSINDICATOR":null,"DELAYDISPATCHFLAG":null,"INTOWFLAG":null,"HELPERFLAG":null,"NUMTASKSCOMPLETED":null,"HONORFUTUREACCESSFLAG":"N","CALLOUTASSIGNFLAG":null,"TECHLOADTYPE":null,"OVERRIDELOADPARAMSIND":"N","LOCATIONPROXIMITYCODE":null,"OOSMEMBERCOUNT":null,"PMAFLAG":null,"OVERRIDEEVALUATEIND":null,"REPORTEDDATETIME":null,"IMMEDDISPIND":null,"RELATEDTROUBLEREPORT":null,"DISPCOORDFLAG":null,"CUSTEMAIL":null,"CUSTNOTIFY":null,"ESTIMATEDARRIVALDATETIME":null,"DISPCOORDKEY":null,"DISPCOORDSEQ":null,"SOURCEQUALIFIER":null,"APDFIELDLOCKS":0,"RESTRICTACCESSCANDIDATEFLAG":null,"ALAPINDICATOR":null,"ENTRYDATETIME":"2018-03-19:18:02:29","JEPRELEASEDATE":null,"JOBLISTDATA":null,"STEPSEQNBR":null,"CMDCNTRLFLAGS":null}} ============================================================================================================================ Disclaimer: This message and the information contained herein is proprietary and confidential and subject to the Tech Mahindra policy statement, you may review the policy at http://www.techmahindra.com/Disclaimer.html <http://www.techmahindra.com/Disclaimer.html> externally http://tim.techmahindra.com/tim/disclaimer.html <http://tim.techmahindra.com/tim/disclaimer.html> internally within TechMahindra. ============================================================================================================================