For 1, try launching with the previous application id documented at: http://docs.datatorrent.com/apexcli/ under the *-originalAppId* parameter of the *launch* command. The starting offset can also be controlled by the *initialOffset* configuration property documented at: http://apex.apache.org/docs/malhar/operators/kafkaInputOperator/
For 2, the abstract base class is tracking some metrics in the metrics field annotated with *@AutoMetric*. These stats are retrievable via a *StatsListener* associated with the operator. An example that logs metrics to the Application Master log file (*dt.log*) is available a ( *kafka-stats* branch)t: https://github.com/amberarrow/examples/tree/kafka-stats/tutorials/kafka The log lines it prints look something like this: *2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener: 1: processStats: metrics.size = 1* *2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener: metric: key = metrics, value.class = org.apache.apex.malhar.kafka.KafkaMetrics* *2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener: bytes-consumed-rate = 7614.458510728283, records-consumed-rate = 253.8277174679261* These are the metrics documented at: http://docs.confluent.io/2.0.1/kafka/monitoring.html For 3, perhaps the Kafka experts can respond. Ram On Thu, Dec 8, 2016 at 8:36 AM, Arvindan Thulasinathan < aravindan.thulasinat...@oracle.com> wrote: > Hi, > I am using “org.apache.apex.malhar.kafka.AbstractKafkaInputOperator” > which supports 0.9.0 + version of Kafka consumer API. I have InitialOffset > set to “APPLICATION_OR_LATEST” > > I have a few questions. > 1. When I kill my application and restart, I expect the app to restart > from the last offset it consumed. But this is not happening. It > automatically starts consuming from the latest offest. Is it because I am > killing the app and not shutting-down? > 2. Is there a way to measure my Kafka consumption rate from the Apex > application? I am looking for something out-of-box rather than sending the > consumption rate over a stream to an output operator. > 3. I couldn’t track offsets on the Kafka broker. > I tried using: "bin/kafka-consumer-groups.sh --new-consumer > --bootstrap-server <kafa-broker>:<port> —list” script on my Kafka broker. > But the result is empty. Any idea, if I need to add some parameter/config > to my Apex Kafka consumer to enable it to commit offset to Broker? > > > Thanks, > Aravindan >