Some more information on the application:
Kafka input operator ==> Deseriazation of avro ==> Enrich the message with some text ==> Unifier (auto-generated) ==> write to hdfs Kafka input operator ----> running in 10 instances ----> with setting ONE_TO_MANY Deseriazation of avro ----> (running in 10 instances with parallel parition) Enrich the message with some text ----> (running in 10 instances with parallel parition) Unifier ----> running in SINGLE instance - accumulating all the messages from 10 partitions --- receiving approx. 1000 msgs per sec --- running with mem setting to 20gb write to hdfs ----> running in SINGLE instance collecting all the messages from Unifier --- receiving approx. 1000 msgs per sec --- running with mem setting to 20gb Please advice. Regards, Raja. From: Pramod Immaneni <pra...@datatorrent.com> Reply-To: "users@apex.apache.org" <users@apex.apache.org> Date: Thursday, July 13, 2017 at 11:27 AM To: "users@apex.apache.org" <users@apex.apache.org> Subject: Re: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail If the data can be written to different files then you can have multiple partitions, with different partitions writing to a disjointed set of files. You cannot have two partitions to writing to the same file. As the file output operator has the ability for the implementation to supply a filename for every tuple, you could provide different filenames in the different partitions. To group data belonging to the same file to go to the same partition, you may need to specify a stream codec. Please see https://ci.apache.org/projects/apex-core/apex-core-javadoc-release-3.6/com/datatorrent/api/StreamCodec.html To specify the number of partitions, for example as 4, you can use the following attribute <property> <name>dt.operator.HDFS_operator.attr.PARTITIONER</name> <value>com.datatorrent.common.partitioner.StatelessPartitioner:4</value> </property> Second, the rate you mentioned 2mb/s isn't too high for a single partition so I am wondering if there is something else going on to increase latencies. In your implementation of the operator, are you doing any buffering or any heavy processing? Thanks On Thu, Jul 13, 2017 at 9:07 AM, Raja.Aravapalli <raja.aravapa...@target.com<mailto:raja.aravapa...@target.com>> wrote: Thanks for the response Pramod. - My hdfs operator is running in single partition. With the input of approx. 1000 msgs per sec. – I am not sure how to partition this operator ☹ - I am not really sure on how to check the bytes/sec. But, I hope It will be huge, because my msg size in kafka is approx. 2kb. ===> input 1000 msgs per sec * 2kb == approx.. 2mb per sec [Rough calculation] - And for your info, right now, using the below property I have the set the memory for this operator to 20Gb. Which I feel is very huge. <property> <name>dt.operator.HDFS_operator.attr.MEMORY_MB</name> <value>20480</value> </property> Please advice. Thanks a lot. Raja. From: Pramod Immaneni <pra...@datatorrent.com<mailto:pra...@datatorrent.com>> Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>> Date: Thursday, July 13, 2017 at 10:31 AM To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>> Subject: [EXTERNAL] Re: hdfs file write operator is increasing the latency - resulting entire DAG to fail Hi Raja, How many partitions do you have for the file output operator and what would you save your data write rate is in bytes/second. Thanks On Thu, Jul 13, 2017 at 8:13 AM, Raja.Aravapalli <raja.aravapa...@target.com<mailto:raja.aravapa...@target.com>> wrote: Team, We have an apex application that is reading from Kafka and wring to HDFS. The data flow for kafka topic is very huge… say 2500 messages per sec!! The issue we are facing is: The operator (which extends AbstractFileOutputOperator) is writing to hdfs is building latency over time and failing eventually. Can someone pls share your thoughts on how I can handle this ? Thanks a lot. Regards, Raja.