Yes, it may be better to pull out the topic selection into an upstream operator that emits the messages on separate ports per topic and then you can use the exactly-once output operator without customization.
On Wed, Oct 12, 2016 at 11:29 AM, Bandaru, Srinivas < srinivas.band...@optum.com> wrote: > Just a correction for use case. > > > > Use case: kafka producer emits messages and processing messages using > Datatorrent application, Based on the DT application logic divide the > messages into two different groups and write it to two different kafka > topics > > > > Thanks, > > Srinivas > > > > > > *From:* Bandaru, Srinivas [mailto:srinivas.band...@optum.com] > *Sent:* Wednesday, October 12, 2016 1:25 PM > *To:* users@apex.apache.org > *Cc:* Singh, Jaspal > *Subject:* KafkaSinglePortExactlyOnceOutputOperator > > > > Hi, Need some help with the errors we are having with” KafkaSinglePort > *ExactlyOnce*OutputOperator”. When monitoring the application, We are > observing that the operator getting inactive and restarts continuously. > Could anyone help us with identifying an issue? > > > > Use case: kafka producer emits messages and processing messages using > Datatorrent application, Based on the DT application logic divide the > messages into two different groups and write it to two different Mapr > strem topics. > > > > > > > > Resource manager log snippet. > > *2016-10-12 10:42:35,134 INFO > org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: > Rebuild the partial window after 6340600660773306680* > > *2016-10-12 10:40:29,603 INFO > com.example.datatorrent.TenantUpdateValidator: error* > > *2016-10-12 10:40:29,605 ERROR > com.datatorrent.stram.engine.StreamingContainer: Operator set > [OperatorDeployInfo[id=3,name=topicUpdate,type=GENERIC,checkpoint={57fe56b50000012b, > 0, > 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=jsonObject,sourceNodeId=2,sourcePortName=out,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] > stopped running due to an exception.* > > *java.lang.RuntimeException: Violates Exactly once. Not all the tuples > received after operator reset.* > > * at > org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:174)* > > * at > com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:146)* > > * at > com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:357)* > > * at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)* > > *2016-10-12 10:40:29,617 INFO > org.apache.kafka.clients.producer.KafkaProducer: Closing the Kafka producer > with timeoutMillis = 9223372036854775807 ms.* > > *2016-10-12 10:40:29,681 INFO > com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [3]* > > *2016-10-12 10:40:29,682 INFO > com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.* > > > > > > *2016-10-12 10:36:13,513 INFO com.datatorrent.bufferserver.server.Server: > Received subscriber request: SubscribeRequestTuple{version=1.0, > identifier=tcp://dbslt0080:60777/2.out.1, windowId=57fe57bf, > type=jsonObject/3.inputPort, upstreamIdentifier=2.out.1, mask=0, > partitions=null, bufferSize=0}* > > *2016-10-12 10:36:15,534 ERROR > com.datatorrent.netlet.AbstractLengthPrependerClient: Disconnecting > Server.Subscriber{type=jsonObject/3.inputPort, mask=0, partitions=null} > because of an exception.* > > *java.io.IOException: Connection reset by peer* > > * at sun.nio.ch.FileDispatcherImpl.read0(Native Method)* > > * at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)* > > * at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)* > > * at sun.nio.ch.IOUtil.read(IOUtil.java:197)* > > * at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)* > > * at > com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:166)* > > * at > com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:356)* > > * at > com.datatorrent.netlet.OptimizedEventLoop$SelectedSelectionKeySet.forEach(OptimizedEventLoop.java:59)* > > * at > com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:192)* > > * at > com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:157)* > > * at > com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:156)* > > * at java.lang.Thread.run(Thread.java:745)* > > > > > > Thanks, > > *Srinivas* > > > > > This e-mail, including attachments, may include confidential and/or > proprietary information, and may be used only by the person or entity > to which it is addressed. If the reader of this e-mail is not the intended > recipient or his or her authorized agent, the reader is hereby notified > that any dissemination, distribution or copying of this e-mail is > prohibited. If you have received this e-mail in error, please notify the > sender by replying to this message and delete this e-mail immediately. > > > This e-mail, including attachments, may include confidential and/or > proprietary information, and may be used only by the person or entity > to which it is addressed. If the reader of this e-mail is not the intended > recipient or his or her authorized agent, the reader is hereby notified > that any dissemination, distribution or copying of this e-mail is > prohibited. If you have received this e-mail in error, please notify the > sender by replying to this message and delete this e-mail immediately. >