Yes, it may be better to pull out the topic selection into an upstream
operator that emits the messages on separate ports per topic and then you
can use the exactly-once output operator without customization.


On Wed, Oct 12, 2016 at 11:29 AM, Bandaru, Srinivas <
srinivas.band...@optum.com> wrote:

> Just a correction for use case.
>
>
>
> Use case: kafka producer emits messages and processing messages using
> Datatorrent application, Based on the DT application logic divide the
> messages into two different groups and write it to two different kafka
> topics
>
>
>
> Thanks,
>
> Srinivas
>
>
>
>
>
> *From:* Bandaru, Srinivas [mailto:srinivas.band...@optum.com]
> *Sent:* Wednesday, October 12, 2016 1:25 PM
> *To:* users@apex.apache.org
> *Cc:* Singh, Jaspal
> *Subject:* KafkaSinglePortExactlyOnceOutputOperator
>
>
>
> Hi, Need some help with the errors we are having with” KafkaSinglePort
> *ExactlyOnce*OutputOperator”. When monitoring the application, We are
> observing that the operator getting inactive and  restarts continuously.
> Could anyone help us with identifying an issue?
>
>
>
> Use case: kafka producer emits messages and processing messages using
> Datatorrent application, Based on the DT application logic divide the
> messages into two different groups and write it to two different Mapr
> strem topics.
>
>
>
>
>
>
>
> Resource manager log snippet.
>
> *2016-10-12 10:42:35,134 INFO
> org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
> Rebuild the partial window after 6340600660773306680*
>
> *2016-10-12 10:40:29,603 INFO
> com.example.datatorrent.TenantUpdateValidator: error*
>
> *2016-10-12 10:40:29,605 ERROR
> com.datatorrent.stram.engine.StreamingContainer: Operator set
> [OperatorDeployInfo[id=3,name=topicUpdate,type=GENERIC,checkpoint={57fe56b50000012b,
> 0,
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=jsonObject,sourceNodeId=2,sourcePortName=out,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
> stopped running due to an exception.*
>
> *java.lang.RuntimeException: Violates Exactly once. Not all the tuples
> received after operator reset.*
>
> *                at
> org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:174)*
>
> *                at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:146)*
>
> *                at
> com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:357)*
>
> *                at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)*
>
> *2016-10-12 10:40:29,617 INFO
> org.apache.kafka.clients.producer.KafkaProducer: Closing the Kafka producer
> with timeoutMillis = 9223372036854775807 ms.*
>
> *2016-10-12 10:40:29,681 INFO
> com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [3]*
>
> *2016-10-12 10:40:29,682 INFO
> com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.*
>
>
>
>
>
> *2016-10-12 10:36:13,513 INFO com.datatorrent.bufferserver.server.Server:
> Received subscriber request: SubscribeRequestTuple{version=1.0,
> identifier=tcp://dbslt0080:60777/2.out.1, windowId=57fe57bf,
> type=jsonObject/3.inputPort, upstreamIdentifier=2.out.1, mask=0,
> partitions=null, bufferSize=0}*
>
> *2016-10-12 10:36:15,534 ERROR
> com.datatorrent.netlet.AbstractLengthPrependerClient: Disconnecting
> Server.Subscriber{type=jsonObject/3.inputPort, mask=0, partitions=null}
> because of an exception.*
>
> *java.io.IOException: Connection reset by peer*
>
> *        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)*
>
> *        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)*
>
> *        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)*
>
> *        at sun.nio.ch.IOUtil.read(IOUtil.java:197)*
>
> *        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)*
>
> *        at
> com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:166)*
>
> *        at
> com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:356)*
>
> *        at
> com.datatorrent.netlet.OptimizedEventLoop$SelectedSelectionKeySet.forEach(OptimizedEventLoop.java:59)*
>
> *        at
> com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:192)*
>
> *        at
> com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:157)*
>
> *        at
> com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:156)*
>
> *        at java.lang.Thread.run(Thread.java:745)*
>
>
>
>
>
> Thanks,
>
> *Srinivas*
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>

Reply via email to