Hi, Need some help with the errors we are having with"
KafkaSinglePortExactlyOnceOutputOperator". When monitoring the application, We
are observing that the operator getting inactive and restarts continuously.
Could anyone help us with identifying an issue?
Use case: kafka producer emits messages and processing messages using
Datatorrent application, Based on the DT application logic divide the messages
into two different groups and write it to two different Mapr strem topics.
Resource manager log snippet.
2016-10-12 10:42:35,134 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: Rebuild
the partial window after 6340600660773306680
2016-10-12 10:40:29,603 INFO com.example.datatorrent.TenantUpdateValidator:
error
2016-10-12 10:40:29,605 ERROR com.datatorrent.stram.engine.StreamingContainer:
Operator set
[OperatorDeployInfo[id=3,name=topicUpdate,type=GENERIC,checkpoint={57fe56b50000012b,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=jsonObject,sourceNodeId=2,sourcePortName=out,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
java.lang.RuntimeException: Violates Exactly once. Not all the tuples received
after operator reset.
at
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator.endWindow(KafkaSinglePortExactlyOnceOutputOperator.java:174)
at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:146)
at
com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:357)
at
com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
2016-10-12 10:40:29,617 INFO org.apache.kafka.clients.producer.KafkaProducer:
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2016-10-12 10:40:29,681 INFO com.datatorrent.stram.engine.StreamingContainer:
Undeploy request: [3]
2016-10-12 10:40:29,682 INFO com.datatorrent.stram.engine.StreamingContainer:
Undeploy complete.
2016-10-12 10:36:13,513 INFO com.datatorrent.bufferserver.server.Server:
Received subscriber request: SubscribeRequestTuple{version=1.0,
identifier=tcp://dbslt0080:60777/2.out.1, windowId=57fe57bf,
type=jsonObject/3.inputPort, upstreamIdentifier=2.out.1, mask=0,
partitions=null, bufferSize=0}
2016-10-12 10:36:15,534 ERROR
com.datatorrent.netlet.AbstractLengthPrependerClient: Disconnecting
Server.Subscriber{type=jsonObject/3.inputPort, mask=0, partitions=null} because
of an exception.
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at com.datatorrent.netlet.AbstractClient.read(AbstractClient.java:166)
at
com.datatorrent.netlet.DefaultEventLoop.handleSelectedKey(DefaultEventLoop.java:356)
at
com.datatorrent.netlet.OptimizedEventLoop$SelectedSelectionKeySet.forEach(OptimizedEventLoop.java:59)
at
com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:192)
at
com.datatorrent.netlet.OptimizedEventLoop.runEventLoop(OptimizedEventLoop.java:157)
at
com.datatorrent.netlet.DefaultEventLoop.run(DefaultEventLoop.java:156)
at java.lang.Thread.run(Thread.java:745)
Thanks,
Srinivas
This e-mail, including attachments, may include confidential and/or
proprietary information, and may be used only by the person or entity
to which it is addressed. If the reader of this e-mail is not the intended
recipient or his or her authorized agent, the reader is hereby notified
that any dissemination, distribution or copying of this e-mail is
prohibited. If you have received this e-mail in error, please notify the
sender by replying to this message and delete this e-mail immediately.