For 1, try launching with the previous application id documented at:
http://docs.datatorrent.com/apexcli/
under the *-originalAppId* parameter of the *launch* command. The starting
offset can also be controlled
by the *initialOffset* configuration property documented at:
http://apex.apache.org/docs/malhar/operators/kafkaInputOperator/

For 2, the abstract base class is tracking some metrics in the metrics
field annotated with *@AutoMetric*.
These stats are retrievable via a *StatsListener* associated with the
operator. An example that logs
metrics to the Application Master log file (*dt.log*) is available a (
*kafka-stats* branch)t:

https://github.com/amberarrow/examples/tree/kafka-stats/tutorials/kafka

The log lines it prints look something like this:

*2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener: 1:
processStats: metrics.size = 1*
*2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener:
metric: key = metrics, value.class =
org.apache.apex.malhar.kafka.KafkaMetrics*
*2016-12-09 10:39:01,456 INFO com.example.myapexapp.KafkaStatsListener:
bytes-consumed-rate = 7614.458510728283, records-consumed-rate =
253.8277174679261*

These are the metrics documented at:
http://docs.confluent.io/2.0.1/kafka/monitoring.html

For 3, perhaps the Kafka experts can respond.

Ram

On Thu, Dec 8, 2016 at 8:36 AM, Arvindan Thulasinathan <
aravindan.thulasinat...@oracle.com> wrote:

> Hi,
>   I am using “org.apache.apex.malhar.kafka.AbstractKafkaInputOperator”
> which supports 0.9.0 + version of Kafka consumer API. I have InitialOffset
> set to “APPLICATION_OR_LATEST”
>
> I have a few questions.
> 1. When I kill my application and restart, I expect the app to restart
> from the last offset it consumed. But this is not happening. It
> automatically starts consuming from the latest offest. Is it because I am
> killing the app and not shutting-down?
> 2. Is there a way to measure my Kafka consumption rate from the Apex
> application? I am looking for something out-of-box rather than sending the
> consumption rate over a stream to an output operator.
> 3. I couldn’t track offsets on the Kafka broker.
> I tried using: "bin/kafka-consumer-groups.sh --new-consumer
> --bootstrap-server <kafa-broker>:<port> —list” script on my Kafka broker.
> But the result is empty. Any idea, if I need to add some parameter/config
> to my Apex Kafka consumer to enable it to commit offset to Broker?
>
>
> Thanks,
> Aravindan
>

Reply via email to