Re: KafkaSinglePortExactlyOnceOutputOperator throwing exception about violation about Exactly once
Thanks Sandesh.. It helped Regards Vivek -- Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: KafkaSinglePortExactlyOnceOutputOperator throwing exception about violation about Exactly once
1. Upstream needs to be idempotent, if not then during recovery you will see that exception. 2. If you are using a custom tuple, please make sure that it HashCode & Equals methods are properly implemented. On Wed, Mar 14, 2018 at 10:09 AM Vivek Bhide wrote: > Exception stack trace is below > > 2018-03-13 17:00:53,219 INFO > com.datatorrent.stram.StreamingContainerManager: Container > container_1520983362676_0002_01_07 buffer server: 80e65028f6a8:60970 > 2018-03-13 17:00:54,811 INFO > com.datatorrent.stram.StreamingContainerParent: > child msg: Stopped running due to an exception. java.lang.RuntimeException: > Violates Exactly once. Not all the tuples received after operator reset. > at > > org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190) > at > > com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153) > at > com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397) > at > > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428) > context: > > PTContainer[id=3(container_1520983362676_0002_01_07),state=ACTIVE,operators=[PTOperator[id=3,name=kafkaOutputOperator,state=ACTIVE]]] > > > > > > -- > Sent from: http://apache-apex-users-list.78494.x6.nabble.com/ >
Re: KafkaSinglePortExactlyOnceOutputOperator throwing exception about violation about Exactly once
Exception stack trace is below 2018-03-13 17:00:53,219 INFO com.datatorrent.stram.StreamingContainerManager: Container container_1520983362676_0002_01_07 buffer server: 80e65028f6a8:60970 2018-03-13 17:00:54,811 INFO com.datatorrent.stram.StreamingContainerParent: child msg: Stopped running due to an exception. java.lang.RuntimeException: Violates Exactly once. Not all the tuples received after operator reset. at org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:190) at com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:153) at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:397) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1428) context: PTContainer[id=3(container_1520983362676_0002_01_07),state=ACTIVE,operators=[PTOperator[id=3,name=kafkaOutputOperator,state=ACTIVE]]] -- Sent from: http://apache-apex-users-list.78494.x6.nabble.com/
Re: KafkaSinglePortExactlyOnceOutputOperator
Yes, it may be better to pull out the topic selection into an upstream operator that emits the messages on separate ports per topic and then you can use the exactly-once output operator without customization. On Wed, Oct 12, 2016 at 11:29 AM, Bandaru, Srinivas < srinivas.band...@optum.com> wrote: > Just a correction for use case. > > > > Use case: kafka producer emits messages and processing messages using > Datatorrent application, Based on the DT application logic divide the > messages into two different groups and write it to two different kafka > topics > > > > Thanks, > > Srinivas > > > > > > *From:* Bandaru, Srinivas [mailto:srinivas.band...@optum.com] > *Sent:* Wednesday, October 12, 2016 1:25 PM > *To:* users@apex.apache.org > *Cc:* Singh, Jaspal > *Subject:* KafkaSinglePortExactlyOnceOutputOperator > > > > Hi, Need some help with the errors we are having with” KafkaSinglePort > *ExactlyOnce*OutputOperator”. When monitoring the application, We are > observing that the operator getting inactive and restarts continuously. > Could anyone help us with identifying an issue? > > > > Use case: kafka producer emits messages and processing messages using > Datatorrent application, Based on the DT application logic divide the > messages into two different groups and write it to two different Mapr > strem topics. > > > > > > > > Resource manager log snippet. > > *2016-10-12 10:42:35,134 INFO > org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: > Rebuild the partial window after 6340600660773306680* > > *2016-10-12 10:40:29,603 INFO > com.example.datatorrent.TenantUpdateValidator: error* > > *2016-10-12 10:40:29,605 ERROR > com.datatorrent.stram.engine.StreamingContainer: Operator set > [OperatorDeployInfo[id=3,name=topicUpdate,type=GENERIC,checkpoint={57fe56b5012b, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=jsonObject,sourceNodeId=2,sourcePortName=out,locality=,partitionMask=0,partitionKeys=]],outputs=[]]] > stopped running due to an exception.* > > *java.lang.RuntimeException: Violates Exactly once. Not all the tuples > received after operator reset.* > > *at > org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:174)* > > *at > com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:146)* > > *at > com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:357)* > > *at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)* > > *2016-10-12 10:40:29,617 INFO > org.apache.kafka.clients.producer.KafkaProducer: Closing the Kafka producer > with timeoutMillis = 9223372036854775807 ms.* > > *2016-10-12 10:40:29,681 INFO > com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [3]* > > *2016-10-12 10:40:29,682 INFO > com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.* > > > > > > *2016-10-12 10:36:13,513 INFO com.datatorrent.bufferserver.server.Server: > Received subscriber request: SubscribeRequestTuple{version=1.0, > identifier=tcp://dbslt0080:60777/2.out.1, windowId=57fe57bf, > type=jsonObject/3.inputPort, upstreamIdentifier=2.out.1, mask=0, > partitions=null, bufferSize=0}* > > *2016-10-12 10:36:15,534 ERROR > com.datatorrent.netlet.AbstractLengthPrependerClient: Disconnecting > Server.Subscriber{type=jsonObject/3.inputPort, mask=0, partitions=null} > because of an exception.* > > *java.io.IOException: Connection reset by peer* > > *at sun.nio.ch.FileDispatcherImpl.read0(Native Method)* > > *at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)* > > *at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)* > > *at sun.nio.ch.IOUtil.read(IOUtil.java:197)* > > *at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)* > > *at > com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:166)* > > *at > com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:356)* > > *at > com.datatorrent.netlet.OptimizedEventLoop$SelectedSelectionKeySet.forEach(OptimizedEventLoop.java:59)* > > *at > com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:192)* > > *at > com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:157)* > > *at > com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:156)* > > *at java.lang.Thread.run(Thread.java:745)* > > > > > > Thanks, > > *Srinivas* > > > > > This e-mail, including attachments, may include confidential and/or > proprietary information, and may be used only by the person or entity > to which it is addressed. If the reader of this e-mail is not the intended > recipient or his or her authorized agent, the reader is hereby notified > that any dissemination, distribution or copying of this e-mail is > prohibi
Re: KafkaSinglePortExactlyOnceOutputOperator
The operator recovery logic assumes that data is written to a single topic. This may happen because it is writing multiple? On Wed, Oct 12, 2016 at 11:25 AM, Bandaru, Srinivas < srinivas.band...@optum.com> wrote: > Hi, Need some help with the errors we are having with” KafkaSinglePort > *ExactlyOnce*OutputOperator”. When monitoring the application, We are > observing that the operator getting inactive and restarts continuously. > Could anyone help us with identifying an issue? > > > > Use case: kafka producer emits messages and processing messages using > Datatorrent application, Based on the DT application logic divide the > messages into two different groups and write it to two different Mapr strem > topics. > > > > > > > > Resource manager log snippet. > > *2016-10-12 10:42:35,134 INFO > org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: > Rebuild the partial window after 6340600660773306680* > > *2016-10-12 10:40:29,603 INFO > com.example.datatorrent.TenantUpdateValidator: error* > > *2016-10-12 10:40:29,605 ERROR > com.datatorrent.stram.engine.StreamingContainer: Operator set > [OperatorDeployInfo[id=3,name=topicUpdate,type=GENERIC,checkpoint={57fe56b5012b, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=jsonObject,sourceNodeId=2,sourcePortName=out,locality=,partitionMask=0,partitionKeys=]],outputs=[]]] > stopped running due to an exception.* > > *java.lang.RuntimeException: Violates Exactly once. Not all the tuples > received after operator reset.* > > *at > org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:174)* > > *at > com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:146)* > > *at > com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:357)* > > *at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)* > > *2016-10-12 10:40:29,617 INFO > org.apache.kafka.clients.producer.KafkaProducer: Closing the Kafka producer > with timeoutMillis = 9223372036854775807 ms.* > > *2016-10-12 10:40:29,681 INFO > com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [3]* > > *2016-10-12 10:40:29,682 INFO > com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.* > > > > > > *2016-10-12 10:36:13,513 INFO com.datatorrent.bufferserver.server.Server: > Received subscriber request: SubscribeRequestTuple{version=1.0, > identifier=tcp://dbslt0080:60777/2.out.1, windowId=57fe57bf, > type=jsonObject/3.inputPort, upstreamIdentifier=2.out.1, mask=0, > partitions=null, bufferSize=0}* > > *2016-10-12 10:36:15,534 ERROR > com.datatorrent.netlet.AbstractLengthPrependerClient: Disconnecting > Server.Subscriber{type=jsonObject/3.inputPort, mask=0, partitions=null} > because of an exception.* > > *java.io.IOException: Connection reset by peer* > > *at sun.nio.ch.FileDispatcherImpl.read0(Native Method)* > > *at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)* > > *at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)* > > *at sun.nio.ch.IOUtil.read(IOUtil.java:197)* > > *at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)* > > *at > com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:166)* > > *at > com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:356)* > > *at > com.datatorrent.netlet.OptimizedEventLoop$SelectedSelectionKeySet.forEach(OptimizedEventLoop.java:59)* > > *at > com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:192)* > > *at > com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:157)* > > *at > com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:156)* > > *at java.lang.Thread.run(Thread.java:745)* > > > > > > Thanks, > > *Srinivas* > > > > > This e-mail, including attachments, may include confidential and/or > proprietary information, and may be used only by the person or entity > to which it is addressed. If the reader of this e-mail is not the intended > recipient or his or her authorized agent, the reader is hereby notified > that any dissemination, distribution or copying of this e-mail is > prohibited. If you have received this e-mail in error, please notify the > sender by replying to this message and delete this e-mail immediately. >
RE: KafkaSinglePortExactlyOnceOutputOperator
Just a correction for use case. Use case: kafka producer emits messages and processing messages using Datatorrent application, Based on the DT application logic divide the messages into two different groups and write it to two different kafka topics Thanks, Srinivas From: Bandaru, Srinivas [mailto:srinivas.band...@optum.com] Sent: Wednesday, October 12, 2016 1:25 PM To: users@apex.apache.org Cc: Singh, Jaspal Subject: KafkaSinglePortExactlyOnceOutputOperator Hi, Need some help with the errors we are having with" KafkaSinglePortExactlyOnceOutputOperator". When monitoring the application, We are observing that the operator getting inactive and restarts continuously. Could anyone help us with identifying an issue? Use case: kafka producer emits messages and processing messages using Datatorrent application, Based on the DT application logic divide the messages into two different groups and write it to two different Mapr strem topics. Resource manager log snippet. 2016-10-12 10:42:35,134 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: Rebuild the partial window after 6340600660773306680 2016-10-12 10:40:29,603 INFO com.example.datatorrent.TenantUpdateValidator: error 2016-10-12 10:40:29,605 ERROR com.datatorrent.stram.engine.StreamingContainer: Operator set [OperatorDeployInfo[id=3,name=topicUpdate,type=GENERIC,checkpoint={57fe56b5012b, 0, 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=jsonObject,sourceNodeId=2,sourcePortName=out,locality=,partitionMask=0,partitionKeys=]],outputs=[]]] stopped running due to an exception. java.lang.RuntimeException: Violates Exactly once. Not all the tuples received after operator reset. at org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:174) at com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:146) at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:357) at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407) 2016-10-12 10:40:29,617 INFO org.apache.kafka.clients.producer.KafkaProducer: Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 2016-10-12 10:40:29,681 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [3] 2016-10-12 10:40:29,682 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete. 2016-10-12 10:36:13,513 INFO com.datatorrent.bufferserver.server.Server: Received subscriber request: SubscribeRequestTuple{version=1.0, identifier=tcp://dbslt0080:60777/2.out.1, windowId=57fe57bf, type=jsonObject/3.inputPort, upstreamIdentifier=2.out.1, mask=0, partitions=null, bufferSize=0} 2016-10-12 10:36:15,534 ERROR com.datatorrent.netlet.AbstractLengthPrependerClient: Disconnecting Server.Subscriber{type=jsonObject/3.inputPort, mask=0, partitions=null} because of an exception. java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:166) at com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:356) at com.datatorrent.netlet.OptimizedEventLoop$SelectedSelectionKeySet.forEach(OptimizedEventLoop.java:59) at com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:192) at com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:157) at com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:156) at java.lang.Thread.run(Thread.java:745) Thanks, Srinivas This e-mail, including attachments, may include confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. If the reader of this e-mail is not the intended recipient or his or her authorized agent, the reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mail immediately. This e-mail, including attachments, may include confidential and/or proprietary information, and may be used only by the person or entity to which it is addressed. If the reader of this e-mail is not the intended recipient or his or her authorized agent, the reader is hereby notified that any dissemination, distribution or copying of this e-mail is prohibited. If you have received this e-mail in error, please notify the sender by replying to this message and delete this e-mai