Any window that was not complete by the time the operator died is not replayed by definition (as we don't have all the data in the window) and the output operators should also not expect that. In your case if the operator died during window ..24 then on restart you can expect that the input operator with the data manager will replay all windows from checkpoint till and including the window prior to failure, in an idempotent fashion, but not the window during which failure happened. Also in idempotent replay, the window is treated as the replay unit, so the exact data within windows is replayed but order is not guaranteed generally because of partitioning the data can arrive in different order than the previous run at the output operators. Typically the output operators in the library that do exactly once do understand and work with these definitions, so not sure exactly why the kafka output operator is reporting exactly once violation for an incomplete window. Maybe somebody who is well versed with the output operator code can comment?
Thanks On Tue, Mar 20, 2018 at 6:16 PM, Vivek Bhide <vivek.bh...@target.com> wrote: > Hi Pramod, > > We did some more research by adding more logging to the KafkaInput operator > and below are our findings. > > Application Setup: > 1. WindowDataManager for KafkaInputOperator is set FSWindowDataManager > 2. Streaming window for application is set to 5 seconds from 0.5 seconds > for > easily reproducing the issue > 3. Created 2 custom classes by for Input and Output operator only for the > purpose of adding debugging logs > > Logs for KafkaIn before operator failure : > -------------------------------------------- > 2018-03-20 19:36:49,494 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples > processed in window 6535189514237771822 : 48 > 2018-03-20 19:36:49,599 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in > window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"} > 2018-03-20 19:36:54,496 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples > processed in window 6535189514237771823 : 48 > 2018-03-20 19:36:54,578 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in > window 6535189514237771824 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} > > Logs of the KafKaIn after recovery : > ------------------------------------ > CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples > processed in window 6535189514237771822 : 48 > 2018-03-20 19:37:06,664 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in > window 6535189514237771823 : {"id":97,"name":"RWOSFMVV0MY7OIXGV2XD"} > 2018-03-20 19:37:06,665 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples > processed in window 6535189514237771823 : 48 > 2018-03-20 19:37:06,720 INFO util.AsyncFSStorageAgent > (AsyncFSStorageAgent.java:save(91)) - using > /grid/5/hadoop/yarn/local/usercache/SVDATHDP/appcache/ > application_1519410901484_172884/container_e3125_1519410901484_172884_01_ > 000005/tmp/chkp4360474156134593331 > as the basepath for checkpointing. > 2018-03-20 19:37:06,727 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples > processed in window 6535189514237771824 : 0 > 2018-03-20 19:37:06,768 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:endWindow(24)) - Total tuples > processed in window 6535189514237771825 : 0 > 2018-03-20 19:37:06,810 INFO > kafkaoutputdedup.CustomKafkaSinglePortInputOperator > (CustomKafkaSinglePortInputOperator.java:emitTuple(33)) - First tuple in > window 6535189514237771826 : {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} > > Logs of the KafkaOutput operator : > ----------------------------------- > > 2018-03-20 19:37:06,616 INFO > kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator > (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - > Current Window : 6535189514237771822 > 2018-03-20 19:37:06,617 INFO > kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator > (CustomKafkaSinglePortExatclyOnceOutputOperator.java: > rebuildPartialWindow(203)) > - Rebuild the partial window after 6535189514237771823 > 2018-03-20 19:37:07,943 INFO > kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator > (CustomKafkaSinglePortExatclyOnceOutputOperator.java: > rebuildPartialWindow(304)) > - Partitial Window tuples : > {id=145, name=GTNQLMEVGRWRHZQANCVM, randomVar=10=1, id=147, > name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148, > name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149, > name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146, > name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1} > 2018-03-20 19:37:07,944 INFO > kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator > (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - > Current Window : 6535189514237771823 > 2018-03-20 19:37:07,944 INFO > kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator > (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(110)) - > Current Window : 6535189514237771824 > 2018-03-20 19:37:07,945 INFO > kafkaoutputdedup.CustomKafkaSinglePortExatclyOnceOutputOperator > (CustomKafkaSinglePortExatclyOnceOutputOperator.java:endWindow(116)) - > Partitial window content : {id=145, name=GTNQLMEVGRWRHZQANCVM, > randomVar=10=1, id=147, name=RVRY4ERRU7UR26J9EL3F, randomVar=10=1, id=148, > name=6LE2ZNZ4Z0S2TGJWO1JW, randomVar=10=1, id=149, > name=PPR4FS85MTMT6WZFSICS, randomVar=10=1, id=146, > name=YCZ2QKLYEJN8ZNW1LAIT, randomVar=10=1} > 2018-03-20 19:37:07,946 ERROR engine.StreamingContainer > (StreamingContainer.java:run(1456)) - Operator set > [OperatorDeployInfo[id=3,name=kafkaOutputOperator,type= > GENERIC,checkpoint={5ab1a83d00000029, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName= > inputPort,streamId=output,sourceNodeId=2,sourcePortName= > output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] > stopped running due to an exception. > java.lang.RuntimeException: Violates Exactly once. Not all the tuples > received after operator reset. > at > com.tgt.outputdeduptest.kafkaoutputdedup.CustomKafkaSinglePortExatclyOn > ceOutputOperator.endWindow(CustomKafkaSinglePortExatclyOn > ceOutputOperator.java:117) > at > com.datatorrent.stram.engine.GenericNode.processEndWindow( > GenericNode.java:153) > at com.datatorrent.stram.engine.GenericNode.run(GenericNode. > java:397) > at > com.datatorrent.stram.engine.StreamingContainer$2.run( > StreamingContainer.java:1428) > 2018-03-20 19:37:07,964 INFO producer.KafkaProducer > (KafkaProducer.java:close(613)) - Closing the Kafka producer with > timeoutMillis = 9223372036854775807 ms. > 2018-03-20 19:37:08,515 INFO engine.StreamingContainer > (StreamingContainer.java:processHeartbeatResponse(808)) - Undeploy > request: > [3] > > > If you see the logs from KafKa In before and after, the last window that > operator processed is 6535189514237771823 and while processing > 6535189514237771824 it got killed. You can also see that the first tuple > from window 6535189514237771824 is {"id":145,"name":" > GTNQLMEVGRWRHZQANCVM"}. > When Operator recovers it replays the tuple correctly till > 6535189514237771823 but then it send 0 tuples for 6535189514237771824 and > 6535189514237771825 window ids and then send the complete accumulated > tuples > in 6535189514237771826 with 1st tuple as > {"id":145,"name":"GTNQLMEVGRWRHZQANCVM"} > > This as per my understanding is not an idempotent behavior since the tuple > assignment before failure changed after recovery. Please correct me if I am > wrong. This we believe is casuing the failure for output operator because > we > see that it recovers correctly with partially processed window > 6535189514237771824. (Please refer the logs). We also verfied it by adding > consumer on output topic > > Could you please confirm if its an issue and needs fix? and suggest one if > possible? > > Regards > Vivek > > > > -- > Sent from: http://apache-apex-users-list.78494.x6.nabble.com/ >