Re: Kafka 2.7.1 Rebalance failed DisconnectException

2021-07-16 Thread Tony John
Hi All,

An update on this. Finally I could figure out the cause for this. I have a
consumer with *MAX_POLL_INTERVAL_MS_CONFIG* set to *Integer.MAX_VALUE*,
which was causing the problem. Looks like its a combination of
*group.initial.rebalance.delay.ms
<http://group.initial.rebalance.delay.ms>* in kafka + the *max.poll.interval.ms
<http://max.poll.interval.ms>* causing the *Rebalance failed.
org.apache.kafka.common.errors.DisconnectException*. After debugging I
could see the below line from AbstractCoordinator class (line #337), which
leads integer overflow if *max.poll.interval.ms
<http://max.poll.interval.ms>* is greater than (Integer.MAX_VALUE - 5000)
and thus *joinGroupTimeoutMs* defaults the request timeout. Now if
*request.time.out* is less than *group.initial.rebalance.delay.ms
<http://group.initial.rebalance.delay.ms>* then the issue occurs. Let me
know what you think. For now I can get away with changing the
max.poll.interval.ms

AbstractCoordinator #337
int joinGroupTimeoutMs = Math.max(this.client.defaultRequestTimeoutMs(),
this.rebalanceConfig.rebalanceTimeoutMs + 5000);

Thanks,
Tony



On Wed, Jul 14, 2021 at 10:56 PM Tony John  wrote:

> Hi Shilin,
>
> Thanks for the suggestion. But I am not upgrading an existing cluster.
> I've got a fresh broker and application cluster and there are no consumer
> offsets or topics present. When the app starts it creates the topics and
> once it moves to RUNNING state I see the rebalance failed log every 30
> seconds. As I understand, the steps in the doc needs to be followed only if
> an existing cluster is being migrated to the new version. Am I missing
> something here? Below is my KafkaConfig from one the broker during start
> up.
>
>
> [2021-07-14 07:27:06,271] INFO KafkaConfig values:
> advertised.host.name = null
> advertised.listeners = PLAINTEXT://broker100:9092
> advertised.port = null
> alter.config.policy.class.name = null
> alter.log.dirs.replication.quota.window.num = 11
> alter.log.dirs.replication.quota.window.size.seconds = 1
> authorizer.class.name =
> auto.create.topics.enable = true
> auto.leader.rebalance.enable = true
> background.threads = 10
> broker.id = 100
> broker.id.generation.enable = true
> broker.rack = null
> client.quota.callback.class = null
> compression.type = producer
> connection.failed.authentication.delay.ms = 100
> connections.max.idle.ms = 108
> connections.max.reauth.ms = 0
> control.plane.listener.name = null
> controlled.shutdown.enable = true
> controlled.shutdown.max.retries = 3
> controlled.shutdown.retry.backoff.ms = 5000
> controller.quota.window.num = 11
> controller.quota.window.size.seconds = 1
> controller.socket.timeout.ms = 3
> create.topic.policy.class.name = null
> default.replication.factor = 1
> delegation.token.expiry.check.interval.ms = 360
> delegation.token.expiry.time.ms = 8640
> delegation.token.master.key = null
> delegation.token.max.lifetime.ms = 60480
> delete.records.purgatory.purge.interval.requests = 1
> delete.topic.enable = true
> fetch.max.bytes = 57671680
> fetch.purgatory.purge.interval.requests = 1000
> group.initial.rebalance.delay.ms = 12
> group.max.session.timeout.ms = 120
> group.max.size = 2147483647
> group.min.session.timeout.ms = 6000
> host.name =
> inter.broker.listener.name = null
> inter.broker.protocol.version = 2.7-IV2
> kafka.metrics.polling.interval.secs = 10
> kafka.metrics.reporters = []
> leader.imbalance.check.interval.seconds = 300
> leader.imbalance.per.broker.percentage = 10
> listener.security.protocol.map =
> PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> listeners = PLAINTEXT://broker100:9092
> log.cleaner.backoff.ms = 15000
> log.cleaner.dedupe.buffer.size = 134217728
> log.cleaner.delete.retention.ms = 8640
> log.cleaner.enable = true
> log.cleaner.io.buffer.load.factor = 0.9
> log.cleaner.io.buffer.size = 524288
> log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
> log.cleaner.max.compaction.lag.ms = 9223372036854775807
> log.cleaner.min.cleanable.ratio = 0.5
> log.cleaner.min.compaction.lag.ms = 0
> log.cleaner.threads = 1
> log.cleanup.policy = [delete]
> log.dir = /tmp/kafka-logs
> log.dirs 

Re: Kafka 2.7.1 Rebalance failed DisconnectException

2021-07-14 Thread Tony John
false
zookeeper.ssl.protocol = TLSv1.2
zookeeper.ssl.truststore.location = null
zookeeper.ssl.truststore.password = null
zookeeper.ssl.truststore.type = null
zookeeper.sync.time.ms = 2000

Thanks,
Tony

On Wed, Jul 14, 2021 at 4:58 PM Shilin Wu  wrote:

> Depending on your original version, you may have to consult the upgrade
> guide.
> https://kafka.apache.org/27/documentation.html#upgrade
>
> Didn't see important compatibility settings like:
> [image: image.png]
>
>
> Perhaps you are not doing it correctly.
>
>
> [image: Confluent] <https://www.confluent.io>
> Wu Shilin
> Solution Architect
> +6581007012
> Follow us: [image: Blog]
> <https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>[image:
> Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
> <https://www.linkedin.com/company/confluent/>[image: Slack]
> <https://slackpass.io/confluentcommunity>[image: YouTube]
> <https://youtube.com/confluent>
> [image: Kafka Summit] <https://www.kafka-summit.org/>
>
>
> On Wed, Jul 14, 2021 at 7:21 PM Tony John 
> wrote:
>
>> Can someone help me on this.
>>
>> Thanks,
>> Tony
>>
>> On Fri, Jul 9, 2021 at 8:15 PM Tony John 
>> wrote:
>>
>> > Hi All,
>> >
>> > I am trying to upgrade my Kafka streams application to 2.7.1 version of
>> > Kafka. The brokers are upgraded to 2.7.1 and kafka dependencies are
>> also on
>> > 2.7.1. But when I start the application, rebalance is failing with the
>> > following message
>> >
>> > Rebalance failed. org.apache.kafka.common.errors.DisconnectException
>> >
>> > I am also seeing Group coordinator broker102:9092 (id: 2147483645 rack:
>> > null) is unavailable or invalid due to cause: coordinator
>> > unavailable.isDisconnected: false. Rediscovery will be attempted.
>> >
>> > The full set of logs (which gets printed every 30 seconds) is given
>> below
>> >
>> > INFO  2021-07-09 09:33:20.229 | internals.AbstractCoordinator [Consumer
>> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
>> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Group
>> coordinator
>> > broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid
>> due to
>> > cause: null.isDisconnected: true. Rediscovery will be attempted.
>> > INFO  2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer
>> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
>> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered
>> group
>> > coordinator broker102:9092 (id: 2147483645 rack: null)
>> > INFO  2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer
>> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
>> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Group
>> coordinator
>> > broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid
>> due to
>> > cause: coordinator unavailable.isDisconnected: false. Rediscovery will
>> be
>> > attempted.
>> > INFO  2021-07-09 09:33:20.330 | internals.AbstractCoordinator [Consumer
>> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
>> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered
>> group
>> > coordinator broker102:9092 (id: 2147483645 rack: null)
>> > INFO  2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer
>> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
>> > groupId=my-app-v1-task-coordinator-consumer-app-node100] Rebalance
>> failed.
>> > org.apache.kafka.common.errors.DisconnectException
>> > INFO  2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer
>> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
>> > groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining
>> group
>> > INFO  2021-07-09 09:33:20.333 | internals.AbstractCoordinator [Consumer
>> > clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
>> > groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining
>> group
>> > INFO  2021-07-09 09:33:20.419 | internals.AbstractCoordinator [Consumer
>> > clientId=consumer-my-app-v1-master-coordinator-consumer-8,
>> > groupId=my-app-v1-master-coordinator-consumer] Group coordinator
>> > broker101:909

Re: Kafka 2.7.1 Rebalance failed DisconnectException

2021-07-14 Thread Tony John
Can someone help me on this.

Thanks,
Tony

On Fri, Jul 9, 2021 at 8:15 PM Tony John  wrote:

> Hi All,
>
> I am trying to upgrade my Kafka streams application to 2.7.1 version of
> Kafka. The brokers are upgraded to 2.7.1 and kafka dependencies are also on
> 2.7.1. But when I start the application, rebalance is failing with the
> following message
>
> Rebalance failed. org.apache.kafka.common.errors.DisconnectException
>
> I am also seeing Group coordinator broker102:9092 (id: 2147483645 rack:
> null) is unavailable or invalid due to cause: coordinator
> unavailable.isDisconnected: false. Rediscovery will be attempted.
>
> The full set of logs (which gets printed every 30 seconds) is given below
>
> INFO  2021-07-09 09:33:20.229 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
> groupId=my-app-v1-task-coordinator-consumer-app-node100] Group coordinator
> broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to
> cause: null.isDisconnected: true. Rediscovery will be attempted.
> INFO  2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
> groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered group
> coordinator broker102:9092 (id: 2147483645 rack: null)
> INFO  2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
> groupId=my-app-v1-task-coordinator-consumer-app-node100] Group coordinator
> broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to
> cause: coordinator unavailable.isDisconnected: false. Rediscovery will be
> attempted.
> INFO  2021-07-09 09:33:20.330 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
> groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered group
> coordinator broker102:9092 (id: 2147483645 rack: null)
> INFO  2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
> groupId=my-app-v1-task-coordinator-consumer-app-node100] Rebalance failed.
> org.apache.kafka.common.errors.DisconnectException
> INFO  2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
> groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining group
> INFO  2021-07-09 09:33:20.333 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
> groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining group
> INFO  2021-07-09 09:33:20.419 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-master-coordinator-consumer-8,
> groupId=my-app-v1-master-coordinator-consumer] Group coordinator
> broker101:9092 (id: 2147483646 rack: null) is unavailable or invalid due to
> cause: null.isDisconnected: true. Rediscovery will be attempted.
> INFO  2021-07-09 09:33:20.420 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-master-coordinator-consumer-8,
> groupId=my-app-v1-master-coordinator-consumer] Discovered group coordinator
> broker101:9092 (id: 2147483646 rack: null)
> INFO  2021-07-09 09:33:20.420 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-master-coordinator-consumer-8,
> groupId=my-app-v1-master-coordinator-consumer] Group coordinator
> broker101:9092 (id: 2147483646 rack: null) is unavailable or invalid due to
> cause: coordinator unavailable.isDisconnected: false. Rediscovery will be
> attempted.
> INFO  2021-07-09 09:33:20.521 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-master-coordinator-consumer-8,
> groupId=my-app-v1-master-coordinator-consumer] Discovered group coordinator
> broker101:9092 (id: 2147483646 rack: null)
> INFO  2021-07-09 09:33:20.522 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-master-coordinator-consumer-8,
> groupId=my-app-v1-master-coordinator-consumer] Rebalance failed.
> org.apache.kafka.common.errors.DisconnectException
> INFO  2021-07-09 09:33:20.523 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-master-coordinator-consumer-8,
> groupId=my-app-v1-master-coordinator-consumer] (Re-)joining group
> INFO  2021-07-09 09:33:20.524 | internals.AbstractCoordinator [Consumer
> clientId=consumer-my-app-v1-master-coordinator-consumer-8,
> groupId=my-app-v1-master-coordinator-consumer] (Re-)joining group
>
> The application was working fine on 2.5.1. Please note with 2.5.1 the
> build used was kafka_*2.12*-2.5.1, but with 2.7.1 I used kaf

Kafka 2.7.1 Rebalance failed DisconnectException

2021-07-09 Thread Tony John
Hi All,

I am trying to upgrade my Kafka streams application to 2.7.1 version of
Kafka. The brokers are upgraded to 2.7.1 and kafka dependencies are also on
2.7.1. But when I start the application, rebalance is failing with the
following message

Rebalance failed. org.apache.kafka.common.errors.DisconnectException

I am also seeing Group coordinator broker102:9092 (id: 2147483645 rack:
null) is unavailable or invalid due to cause: coordinator
unavailable.isDisconnected: false. Rediscovery will be attempted.

The full set of logs (which gets printed every 30 seconds) is given below

INFO  2021-07-09 09:33:20.229 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
groupId=my-app-v1-task-coordinator-consumer-app-node100] Group coordinator
broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to
cause: null.isDisconnected: true. Rediscovery will be attempted.
INFO  2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered group
coordinator broker102:9092 (id: 2147483645 rack: null)
INFO  2021-07-09 09:33:20.230 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
groupId=my-app-v1-task-coordinator-consumer-app-node100] Group coordinator
broker102:9092 (id: 2147483645 rack: null) is unavailable or invalid due to
cause: coordinator unavailable.isDisconnected: false. Rediscovery will be
attempted.
INFO  2021-07-09 09:33:20.330 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
groupId=my-app-v1-task-coordinator-consumer-app-node100] Discovered group
coordinator broker102:9092 (id: 2147483645 rack: null)
INFO  2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
groupId=my-app-v1-task-coordinator-consumer-app-node100] Rebalance failed.
org.apache.kafka.common.errors.DisconnectException
INFO  2021-07-09 09:33:20.331 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining group
INFO  2021-07-09 09:33:20.333 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-task-coordinator-consumer-app-node100-9,
groupId=my-app-v1-task-coordinator-consumer-app-node100] (Re-)joining group
INFO  2021-07-09 09:33:20.419 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-master-coordinator-consumer-8,
groupId=my-app-v1-master-coordinator-consumer] Group coordinator
broker101:9092 (id: 2147483646 rack: null) is unavailable or invalid due to
cause: null.isDisconnected: true. Rediscovery will be attempted.
INFO  2021-07-09 09:33:20.420 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-master-coordinator-consumer-8,
groupId=my-app-v1-master-coordinator-consumer] Discovered group coordinator
broker101:9092 (id: 2147483646 rack: null)
INFO  2021-07-09 09:33:20.420 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-master-coordinator-consumer-8,
groupId=my-app-v1-master-coordinator-consumer] Group coordinator
broker101:9092 (id: 2147483646 rack: null) is unavailable or invalid due to
cause: coordinator unavailable.isDisconnected: false. Rediscovery will be
attempted.
INFO  2021-07-09 09:33:20.521 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-master-coordinator-consumer-8,
groupId=my-app-v1-master-coordinator-consumer] Discovered group coordinator
broker101:9092 (id: 2147483646 rack: null)
INFO  2021-07-09 09:33:20.522 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-master-coordinator-consumer-8,
groupId=my-app-v1-master-coordinator-consumer] Rebalance failed.
org.apache.kafka.common.errors.DisconnectException
INFO  2021-07-09 09:33:20.523 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-master-coordinator-consumer-8,
groupId=my-app-v1-master-coordinator-consumer] (Re-)joining group
INFO  2021-07-09 09:33:20.524 | internals.AbstractCoordinator [Consumer
clientId=consumer-my-app-v1-master-coordinator-consumer-8,
groupId=my-app-v1-master-coordinator-consumer] (Re-)joining group

The application was working fine on 2.5.1. Please note with 2.5.1 the build
used was kafka_*2.12*-2.5.1, but with 2.7.1 I used kafka_*2.13*-2.7.1

*The broker config is *

broker.id=2
listeners=PLAINTEXT://broker102:9092
advertised.listeners=PLAINTEXT://broker102:9092
num.network.threads=4
num.io.threads=4
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/mnt/store/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=4
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log

Re: Kafka Streams : Rack Aware deployment in EC2

2020-10-25 Thread Tony John
Thanks a lot Guozhang. Will give it a try.

Thanks,
Tony

On Tue, Oct 20, 2020 at 10:42 PM Guozhang Wang  wrote:

> Hello Tony,
>
> I think you already know the consumer-client side fetch-from-follower
> feature:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica
>
> Say your Kafka deployment is cross-AZs, and your Streams is also deployed
> cross-AZ, you can, by setting the "client.rack" through streams configs, to
> let the embedded consumers to read from close replicas within the same AZ
> as well.
>
> Guozhang
>
>
> On Mon, Oct 19, 2020 at 9:19 PM Tony John 
> wrote:
>
> > Thanks Matthias.. I don't think I will be able to take it up.. Will wait
> > for it to be available in future. :)
> >
> > On Mon, Oct 19, 2020 at 11:56 PM Matthias J. Sax 
> wrote:
> >
> > > No suggestions... Sorry.
> > >
> > > The task assignment algorithm cannot be customized.
> > >
> > > The only thing you _could_ do, is to pickup the ticket yourself (to get
> > > the feature maybe in 2.8 release). Not sure if you would be interested
> > > to contribute :)
> > >
> > >
> > > -Matthias
> > >
> > > On 10/19/20 11:08 AM, Tony John wrote:
> > > > Thanks for the quick response Matthias. We were planning to move to
> AWS
> > > > MSK, and I think we will have to defer it until rack awareness is in
> > > place
> > > > for Kafka Streams. Reason being consumers consuming from brokers in
> > > > different availability zones would result in high data transfer
> > charges.
> > > Do
> > > > you have any suggestions?
> > > >
> > > > Thanks,
> > > > Tony
> > > >
> > > > On Mon, Oct 19, 2020 at 11:08 PM Matthias J. Sax 
> > > wrote:
> > > >
> > > >> There is no such feature for Kafka Streams yet.
> > > >>
> > > >> We have one ticket for rack aware standby task placement
> > > >> (https://issues.apache.org/jira/browse/KAFKA-6718) but nobody is
> > > working
> > > >> on it.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >> On 10/19/20 9:20 AM, Tony John wrote:
> > > >>> Hi All,
> > > >>>
> > > >>> I have been trying to figure out some documentation on how to
> enable
> > > rack
> > > >>> awareness on a Kafka Streams app. I do see the broker.rack
> > > configuration
> > > >>> which needs to be done at the broker side and client.rack
> > configuration
> > > >> for
> > > >>> the consumers. Is there any specific configuration which is
> required
> > to
> > > >>> enable rack awareness in a streams app? Please throw some light on
> > > this.
> > > >>>
> > > >>> PS: I am using Kafka / Kafka Streams 2.2.1
> > > >>>
> > > >>> Thanks,
> > > >>> Tony
> > > >>>
> > > >>
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>


Re: Kafka Streams : Rack Aware deployment in EC2

2020-10-19 Thread Tony John
Thanks Matthias.. I don't think I will be able to take it up.. Will wait
for it to be available in future. :)

On Mon, Oct 19, 2020 at 11:56 PM Matthias J. Sax  wrote:

> No suggestions... Sorry.
>
> The task assignment algorithm cannot be customized.
>
> The only thing you _could_ do, is to pickup the ticket yourself (to get
> the feature maybe in 2.8 release). Not sure if you would be interested
> to contribute :)
>
>
> -Matthias
>
> On 10/19/20 11:08 AM, Tony John wrote:
> > Thanks for the quick response Matthias. We were planning to move to AWS
> > MSK, and I think we will have to defer it until rack awareness is in
> place
> > for Kafka Streams. Reason being consumers consuming from brokers in
> > different availability zones would result in high data transfer charges.
> Do
> > you have any suggestions?
> >
> > Thanks,
> > Tony
> >
> > On Mon, Oct 19, 2020 at 11:08 PM Matthias J. Sax 
> wrote:
> >
> >> There is no such feature for Kafka Streams yet.
> >>
> >> We have one ticket for rack aware standby task placement
> >> (https://issues.apache.org/jira/browse/KAFKA-6718) but nobody is
> working
> >> on it.
> >>
> >>
> >> -Matthias
> >>
> >> On 10/19/20 9:20 AM, Tony John wrote:
> >>> Hi All,
> >>>
> >>> I have been trying to figure out some documentation on how to enable
> rack
> >>> awareness on a Kafka Streams app. I do see the broker.rack
> configuration
> >>> which needs to be done at the broker side and client.rack configuration
> >> for
> >>> the consumers. Is there any specific configuration which is required to
> >>> enable rack awareness in a streams app? Please throw some light on
> this.
> >>>
> >>> PS: I am using Kafka / Kafka Streams 2.2.1
> >>>
> >>> Thanks,
> >>> Tony
> >>>
> >>
> >
>


Re: Kafka Streams : Rack Aware deployment in EC2

2020-10-19 Thread Tony John
Thanks for the quick response Matthias. We were planning to move to AWS
MSK, and I think we will have to defer it until rack awareness is in place
for Kafka Streams. Reason being consumers consuming from brokers in
different availability zones would result in high data transfer charges. Do
you have any suggestions?

Thanks,
Tony

On Mon, Oct 19, 2020 at 11:08 PM Matthias J. Sax  wrote:

> There is no such feature for Kafka Streams yet.
>
> We have one ticket for rack aware standby task placement
> (https://issues.apache.org/jira/browse/KAFKA-6718) but nobody is working
> on it.
>
>
> -Matthias
>
> On 10/19/20 9:20 AM, Tony John wrote:
> > Hi All,
> >
> > I have been trying to figure out some documentation on how to enable rack
> > awareness on a Kafka Streams app. I do see the broker.rack configuration
> > which needs to be done at the broker side and client.rack configuration
> for
> > the consumers. Is there any specific configuration which is required to
> > enable rack awareness in a streams app? Please throw some light on this.
> >
> > PS: I am using Kafka / Kafka Streams 2.2.1
> >
> > Thanks,
> > Tony
> >
>


Kafka Streams : Rack Aware deployment in EC2

2020-10-19 Thread Tony John
Hi All,

I have been trying to figure out some documentation on how to enable rack
awareness on a Kafka Streams app. I do see the broker.rack configuration
which needs to be done at the broker side and client.rack configuration for
the consumers. Is there any specific configuration which is required to
enable rack awareness in a streams app? Please throw some light on this.

PS: I am using Kafka / Kafka Streams 2.2.1

Thanks,
Tony


Kafka Streams 1.1.0 - Significant Performance Drop

2018-04-27 Thread Tony John
Hi All,

I was trying to switch to the latest version of streams (1.1.0) and started
seeing a significant drop in performance of the application. I was using
0.11.0.2 before. After doing some checks I found that the choking point was
Rocksdb flush which contributes almost 80% of the CPU time (PFA the
screenshots). One thing which I was doing earlier with the application was
that I was doing a context.commit() from the Processor's process method for
each record which gets processed. After doing some throttling on this and
restricting the commit to every 100K records, I could see that the
performance was on par with the previous version. So below are my queries


   1. Is it wrong to do the store commits for each record which gets
   processed?
   2. Are there any other configurations which I need to make in order to
   get rid of this other than throttling the commits
   3. Or is it actually an issue with the 1.1.0, which I don't think will
   be the case as I haven't seen anyone else reporting this so far.

Please suggest.




Thanks,
Tony


Re: Kafka Streams : TimeoutException Expiring Records

2018-02-22 Thread Tony John
Thanks a lot Bill for looking in to this. I would definitely attempt the
suggestions and let you know the outcome. I have gone through KIP-91, but
struggling to understand the behavior. So does it mean that these errors
are happening due to a failure in the broker? If so why would it kill all
the threads which causes the consumers to be unavailable? Also is there
anyway to handle these errors gracefully so that it does not kill the
threads and hence the consumers?

Thanks,
Tony

On Fri, Feb 23, 2018 at 1:15 AM, Bill Bejeck  wrote:

> Hi Tony,
>
> Looks like you have a known issue that KIP-91(
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 91+Provide+Intuitive+User+Timeouts+in+The+Producer)
> will address.
>
> In the meantime, as a workaround, you could try
> setting REQUEST_TIMEOUT_MS_CONFIG to a large value (Integer.MAX_VALUE ?)
> other secondary configurations to consider changing would be increasing "
> max.block.ms" and "retries"
>
> Thanks,
> Bill
>
> On Thu, Feb 22, 2018 at 8:14 AM, Tony John 
> wrote:
>
> > Hi All,
> >
> > I am running into an issue with my Kafka Streams application. The
> > application was running fine for almost 2 weeks, then it started throwing
> > the below exception which caused the threads to die. Now when I restart
> the
> > application, it dies quickly (1-2 hrs) when trying to catch up the lag.
> >
> > The application is running on an AWS EC2 instance with 8 core processor
> and
> > 16GB of memory. The streams config is given below and more logs are
> > available below (I have stripped of some logs which I though may not be
> > relevant). Towards the end of this thread you will be able to see lot of
> > exceptions similar to the below one + RocksDBExceptions
> > (org.apache.kafka.streams.errors.ProcessorStateException: Error opening
> > store vstore at location
> > /mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please
> take
> > a look at it and let me know what could be wrong here?
> >
> > INFO  2018-02-21 08:37:20.758 [Engine2-StreamThread-2]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and
> > standby tasks [4_0, 1_30] in 0ms
> > ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread |
> > Engine2-StreamThread-6-producer]
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task
> > [1_34] Error sending record to topic cv-v1-cv. No more offsets will be
> > recorded for this task and the exception will eventually be thrown
> > org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s)
> for
> > cv-v1-cv-2: 31439 ms has passed since last append
> > DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3]
> > org.apache.kafka.streams.processor.internals.StreamThread -
> stream-thread
> > [Engine2-StreamThread-3] processing latency 46282 > commit time 3 for
> > 1 records. Adjusting down recordsProcessedBeforeCommit=6482
> > ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
> > org.apache.kafka.streams.processor.internals.AssignedTasks -
> stream-thread
> > [Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the
> > following error:
> > org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
> > caught when producing
> > at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:137)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
> > RecordCollectorImpl.java:145)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamTask.flushState(
> > StreamTask.java:296)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask$1.run(StreamTask.java:275)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:201)
> > ~[kafka-streams-0.11.0.2.jar:?
> > ]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.
> > StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(
> > AssignedTasks.java:374)
> > ~[kafka-streams-0.11.0.2.jar:?]
> > at org.apache.kafka.streams.processor.internals.AssignedTasks.

Kafka Streams : TimeoutException Expiring Records

2018-02-22 Thread Tony John
Hi All,

I am running into an issue with my Kafka Streams application. The
application was running fine for almost 2 weeks, then it started throwing
the below exception which caused the threads to die. Now when I restart the
application, it dies quickly (1-2 hrs) when trying to catch up the lag.

The application is running on an AWS EC2 instance with 8 core processor and
16GB of memory. The streams config is given below and more logs are
available below (I have stripped of some logs which I though may not be
relevant). Towards the end of this thread you will be able to see lot of
exceptions similar to the below one + RocksDBExceptions
(org.apache.kafka.streams.errors.ProcessorStateException: Error opening
store vstore at location
/mnt/store/kafka-streams/pe-v1/1_10/rocksdb/vstore). Could you please take
a look at it and let me know what could be wrong here?

INFO  2018-02-21 08:37:20.758 [Engine2-StreamThread-2]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-2] Committed all active tasks [7_12, 6_3, 1_27] and
standby tasks [4_0, 1_30] in 0ms
ERROR 2018-02-21 08:37:24.853 [kafka-producer-network-thread |
Engine2-StreamThread-6-producer]
org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task
[1_34] Error sending record to topic cv-v1-cv. No more offsets will be
recorded for this task and the exception will eventually be thrown
org.apache.kafka.common.errors.TimeoutException: Expiring 74 record(s) for
cv-v1-cv-2: 31439 ms has passed since last append
DEBUG 2018-02-21 08:37:24.859 [Engine2-StreamThread-3]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-3] processing latency 46282 > commit time 3 for
1 records. Adjusting down recordsProcessedBeforeCommit=6482
ERROR 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread
[Engine2-StreamThread-6] Failed to commit stream task 1_34 due to the
following error:
org.apache.kafka.streams.errors.StreamsException: task [1_34] exception
caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
checkForException(RecordCollectorImpl.java:137)
~[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:145)
~[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:296)
~[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:275)
~[kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
measureLatencyNs(StreamsMetricsImpl.java:201) ~[kafka-streams-0.11.0.2.jar:?
]
at org.apache.kafka.streams.processor.internals.
StreamTask.commit(StreamTask.java:270) ~[kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.
StreamTask.commit(StreamTask.java:264) ~[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.AssignedTasks$3.apply(AssignedTasks.java:374)
~[kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.
applyToRunningTasks(AssignedTasks.java:420) [kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.
punctuateAndCommit(AssignedTasks.java:357) [kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.
processAndPunctuate(StreamThread.java:662) [kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513)
[kafka-streams-0.11.0.2.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482)
[kafka-streams-0.11.0.2.jar:?]
at org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:459) [kafka-streams-0.11.0.2.jar:?]
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 74
record(s) for cv-v1-cv-2: 31439 ms has passed since last append
INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-6] Shutting down
INFO  2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-6] State transition from PARTITIONS_ASSIGNED to
PENDING_SHUTDOWN.
DEBUG 2018-02-21 08:37:24.865 [Engine2-StreamThread-6]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-6] Shutting down all active tasks [6_12, 1_34, 7_15],
standby tasks [1_2, 1_7], suspended tasks [1_18, 7_15, 6_3, 1_27], and
suspended standby tasks [1_3, 1_30]
INFO  2018-02-21 08:37:24.879 [Engine2-StreamThread-3]
org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[Engine2-StreamThread-3] Committed all active tasks [1_31, 1_24, 6_7] and
standby tasks [1_0, 1_6, 

Re: Kafka Streams : Connection to node -1 could not be established. Broker may not be available.

2018-02-07 Thread Tony John
Thanks a lot Guozhang. I was able to nail it down by looking at the log
which you suggested. The log revealed that it was trying to connect to
localhost and it was a problem with one of my sub component. It was trying
to read the broker configuration from a different property file which
didn't exist and defaulted to loopback address.

Thanks a lot for the help.

Thanks,
Tony

On Wed, Feb 7, 2018 at 3:05 AM, Guozhang Wang  wrote:

> Hi Tony,
>
> Your Streams configs look good to me, and the additional streams log from
> StreamThread are normal operational logs that do not related to the issue.
> I suspect there is a network partition between your client to the broker
> node, and to investigate which host this `node -1` is referring to (note
> that -1 is actually a valid node id, read from the bootstrapped list), you
> can look into the same log file and search for the following line:
>
> "Initiating connection to node.."
>
> and the find out the host:port of node with id -1 .
>
> Guozhang
>
> On Tue, Feb 6, 2018 at 8:16 AM, Tony John 
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for looking into this. Below are the stream config values.
> >
> > INFO  2018-02-02 08:33:25.708 [main] org.apache.kafka.streams.
> > StreamsConfig
> > - StreamsConfig values:
> > application.id = cv-v1
> > application.server =
> > bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092]
> > buffered.records.per.partition = 1000
> > cache.max.bytes.buffering = 104857600
> > client.id = I2
> > commit.interval.ms = 3
> > connections.max.idle.ms = 54
> > default.key.serde = class
> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > default.timestamp.extractor = class
> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> > default.value.serde = class
> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > key.serde = null
> > metadata.max.age.ms = 30
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.recording.level = DEBUG
> > metrics.sample.window.ms = 3
> > num.standby.replicas = 1
> > num.stream.threads = 1
> > partition.grouper = class
> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> > poll.ms = 100
> > processing.guarantee = at_least_once
> > receive.buffer.bytes = 32768
> > reconnect.backoff.max.ms = 1000
> > reconnect.backoff.ms = 50
> > replication.factor = 2
> > request.timeout.ms = 4
> > retry.backoff.ms = 100
> > rocksdb.config.setter = null
> > security.protocol = PLAINTEXT
> > send.buffer.bytes = 131072
> > state.cleanup.delay.ms = 60
> > state.dir = /mnt/store/kafka-streams
> > timestamp.extractor = null
> > value.serde = null
> > windowstore.changelog.additional.retention.ms = 8640
> > zookeeper.connect =
> > INFO  2018-02-02 08:33:25.870 [main] org.apache.kafka.streams.
> > StreamsConfig
> > - StreamsConfig values:
> > application.id = pe-v1
> > application.server =
> > bootstrap.servers = [172.31.10.35:9092, 172.31.14.8:9092]
> > buffered.records.per.partition = 1000
> > cache.max.bytes.buffering = 2147483648
> > client.id = I2
> > commit.interval.ms = 3
> > connections.max.idle.ms = 54
> > default.key.serde = class
> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > default.timestamp.extractor = class
> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> > default.value.serde = class
> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > key.serde = null
> > metadata.max.age.ms = 30
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.recording.level = DEBUG
> > metrics.sample.window.ms = 3
> > num.standby.replicas = 1
> > num.stream.threads = 3
> > partition.grouper = class
> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> > poll.ms = 100
> > processing.guarantee = at_least_once
> > receive.buffer.bytes = 32768
> > reconnect.backoff.max.ms = 1000
> > reconnect.backoff.ms = 50
> > replication.factor = 2
> > request.timeout.ms = 4
> > retry.backoff.ms = 100
> > rocksdb.config.setter = null
> > security.protocol = PLAINTEXT
> > send.buffer.bytes = 131072
> > state.cleanup.delay.ms = 60
> > state.dir = /mnt/store/kafka-streams
> > timestamp.extractor = null
> > value.serde = null
> > windowstore.changelog.additional.retention.ms = 8640
> > zookeeper.connect =
> >
> > Please note there are 2 streams applicatio

Re: Kafka Streams : Connection to node -1 could not be established. Broker may not be available.

2018-02-06 Thread Tony John
d - stream-thread
[I2-StreamThread-7] processing latency 2698 < commit time 3 for 6880
records. Adjusting up recordsProcessedBeforeCommit=76501


Thanks,
Tony

On Tue, Feb 6, 2018 at 3:21 AM, Guozhang Wang  wrote:

> Hello Tony,
>
>
> Could you share your Streams config values so that people can help further
> investigating your issue?
>
>
> Guozhang
>
>
> On Mon, Feb 5, 2018 at 12:00 AM, Tony John 
> wrote:
>
> > Hi All,
> >
> > I have been running a streams application for sometime. The application
> > runs fine for sometime but after a day or two I see the below log getting
> > printed continuously on to the console.
> >
> > WARN  2018-02-05 02:50:04.060 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.160 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.261 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.311 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.361 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > WARN  2018-02-05 02:50:04.411 [kafka-producer-network-thread |
> producer-1]
> > org.apache.kafka.clients.NetworkClient - Connection to node -1 could not
> > be
> > established. Broker may not be available.
> >
> > At this time, though the application is able to process the messages, I
> > could also see lag building up in the consumers and the processing time
> for
> > a batch has increased 15 folds.
> >
> > I am using a single zoo-keeper instance with 2 brokers and 4 application
> > instances. I checked the broker and zoo-keeper status, they are all
> running
> > fine as I could see. I have also verified the connectivity between the
> > application and broker instances using telnet and it seems intact. The
> > kafka broker and streams/client versions are 0.11.0.2. Results of broker
> > status results from zoo-keeper below
> >
> >
> > [root@app100 kafka]# echo dump | nc localhost 2181
> >
> > SessionTracker dump:
> >
> > Session Sets (3):
> >
> > 0 expire at Mon Feb 05 06:16:39 UTC 2018:
> >
> > 1 expire at Mon Feb 05 06:16:42 UTC 2018:
> >
> > 0x161562860970001
> >
> > 1 expire at Mon Feb 05 06:16:45 UTC 2018:
> >
> > 0x16156286097
> >
> > ephemeral nodes dump:
> >
> > Sessions with Ephemerals (2):
> >
> > 0x16156286097:
> >
> > /brokers/ids/0
> >
> > /controller
> >
> > 0x161562860970001:
> >
> > /brokers/ids/1
> >
> > [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh
> > localhost:2181 <<< "get /brokers/ids/0"
> >
> > Connecting to localhost:2181
> >
> > Welcome to ZooKeeper!
> >
> > JLine support is disabled
> >
> >
> > WATCHER::
> >
> >
> > WatchedEvent state:SyncConnected type:None path:null
> >
> > {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"}
> > ,"endpoints":["PLAINTEXT://172.31.10.35:9092"],"jmx_port":
> > 5,"host":"172.31.10.35","timestamp":"1517569007467","
> > port":9092,"version":4}
> >
> > cZxid = 0x1c
> >
> > ctime = Fri Feb 02 10:56:47 UTC 2018
> >
> > mZxid = 0x1c
> >
> > mtime = Fri Feb 02 10:56:47 UTC 2018
> >
> > pZxid = 0x1c
> >
> > cversion = 0
> >
> > dataVersion = 0
> >
> > aclVersion = 0
> >
> > ephemeralOwner = 0x16156286097
> >
> > dataLength = 197
> >
> > numChildren = 0
> >
> > [root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh
> > localhost:2181 <<< "get /brokers/ids/1"
> >
> > Connecting to localhost:2181
> >
> > Welcome to ZooKeeper!
> >
> > JLine support is disabled
> >
> >
> > WATCHER::
> >
> >
> > WatchedEvent state:SyncConnected type:None path:null
> >
> > {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"}
> > ,"endpoints":["PLAINTEXT://172.31.14.8:9092"],"jmx_port":
> > 5,"host":"172.31.14.8","timestamp":"1517569016562","
> > port":9092,"version":4}
> >
> > cZxid = 0x21
> >
> > ctime = Fri Feb 02 10:56:56 UTC 2018
> >
> > mZxid = 0x21
> >
> > mtime = Fri Feb 02 10:56:56 UTC 2018
> >
> > pZxid = 0x21
> >
> > cversion = 0
> >
> > dataVersion = 0
> >
> > aclVersion = 0
> >
> > ephemeralOwner = 0x161562860970001
> >
> > dataLength = 195
> >
> > numChildren = 0
> >
> > Could you please throw some light on this as to what could be going wrong
> > here?
> >
> > Thanks,
> > Tony
> >
>
>
>
> --
> -- Guozhang
>


Kafka Streams : Connection to node -1 could not be established. Broker may not be available.

2018-02-05 Thread Tony John
Hi All,

I have been running a streams application for sometime. The application
runs fine for sometime but after a day or two I see the below log getting
printed continuously on to the console.

WARN  2018-02-05 02:50:04.060 [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be
established. Broker may not be available.

WARN  2018-02-05 02:50:04.160 [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be
established. Broker may not be available.

WARN  2018-02-05 02:50:04.261 [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be
established. Broker may not be available.

WARN  2018-02-05 02:50:04.311 [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be
established. Broker may not be available.

WARN  2018-02-05 02:50:04.361 [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be
established. Broker may not be available.

WARN  2018-02-05 02:50:04.411 [kafka-producer-network-thread | producer-1]
org.apache.kafka.clients.NetworkClient - Connection to node -1 could not be
established. Broker may not be available.

At this time, though the application is able to process the messages, I
could also see lag building up in the consumers and the processing time for
a batch has increased 15 folds.

I am using a single zoo-keeper instance with 2 brokers and 4 application
instances. I checked the broker and zoo-keeper status, they are all running
fine as I could see. I have also verified the connectivity between the
application and broker instances using telnet and it seems intact. The
kafka broker and streams/client versions are 0.11.0.2. Results of broker
status results from zoo-keeper below


[root@app100 kafka]# echo dump | nc localhost 2181

SessionTracker dump:

Session Sets (3):

0 expire at Mon Feb 05 06:16:39 UTC 2018:

1 expire at Mon Feb 05 06:16:42 UTC 2018:

0x161562860970001

1 expire at Mon Feb 05 06:16:45 UTC 2018:

0x16156286097

ephemeral nodes dump:

Sessions with Ephemerals (2):

0x16156286097:

/brokers/ids/0

/controller

0x161562860970001:

/brokers/ids/1

[root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh
localhost:2181 <<< "get /brokers/ids/0"

Connecting to localhost:2181

Welcome to ZooKeeper!

JLine support is disabled


WATCHER::


WatchedEvent state:SyncConnected type:None path:null

{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"}
,"endpoints":["PLAINTEXT://172.31.10.35:9092"],"jmx_port":
5,"host":"172.31.10.35","timestamp":"1517569007467","
port":9092,"version":4}

cZxid = 0x1c

ctime = Fri Feb 02 10:56:47 UTC 2018

mZxid = 0x1c

mtime = Fri Feb 02 10:56:47 UTC 2018

pZxid = 0x1c

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x16156286097

dataLength = 197

numChildren = 0

[root@app100 kafka]# ./kafka_2.11-0.11.0.2/bin/zookeeper-shell.sh
localhost:2181 <<< "get /brokers/ids/1"

Connecting to localhost:2181

Welcome to ZooKeeper!

JLine support is disabled


WATCHER::


WatchedEvent state:SyncConnected type:None path:null

{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"}
,"endpoints":["PLAINTEXT://172.31.14.8:9092"],"jmx_port":
5,"host":"172.31.14.8","timestamp":"1517569016562","
port":9092,"version":4}

cZxid = 0x21

ctime = Fri Feb 02 10:56:56 UTC 2018

mZxid = 0x21

mtime = Fri Feb 02 10:56:56 UTC 2018

pZxid = 0x21

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x161562860970001

dataLength = 195

numChildren = 0

Could you please throw some light on this as to what could be going wrong
here?

Thanks,
Tony


Re: Kafka Streams : CommitFailedException

2017-11-08 Thread Tony John
Hello Guozhang,

Thanks for the analysis. Figured out the reason for the OOM and it was
actually caused by an in memory queue in the app itself. I have fixed it
and right now it all looks good. Sorry for the inconvenience and thanks for
helping out.

Thanks,
Tony

On Wed, Nov 8, 2017 at 1:20 AM, Guozhang Wang  wrote:

> Hello Tony,
>
> You mentioned in 0.11.0.0 the
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> 1) while in 0.11.0.1 props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
> 2).
> But from your logs it seems you set this config as 2 in both versions.
> Right?
>
> Anyways, I took a look into your logs and I think you are hitting a known
> issue (https://issues.apache.org/jira/browse/KAFKA-5152) that has been
> fixed in 0.11.0.0; that is why you only see the WARN log entry in 0.11.0.1
> but the app is not dying out. The running out of memory issues seems not
> related to the CommitFailed error. Do you have any stateful operations in
> your app that use an iterator? Did you close the iterator after complete
> using it?
>
>
> Guozhang
>
>
> On Tue, Nov 7, 2017 at 12:42 AM, Tony John 
> wrote:
>
> > Hi Guozang,
> >
> > Thanks for looking into this. I was using 0.11.0.0 version of the library
> > earlier when I was getting the CommitFailed exception and the tasks were
> > terminating. The application config then was Replication Factor = 2, Num
> > Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
> > Interval = 2147483647. The streams config code (*Streams Config While
> > Using
> > 0.11.0.0*) is given below and the logs of the application while using
> > 0.11.0.0 can be downloaded from
> > https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0
> >
> > I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
> > Though the CommitFailed error logs are still showing up with 0.11.0.1 the
> > tasks are not getting terminated, but the app quickly runs out of memory
> > (GC overhead limit exceeded) and the CPU is choked, which was not the
> case
> > earlier. The logs are available @
> > https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
> > application config is also given below (*Streams Config While Using
> > 0.11.0.1*).  Since I am not sure what configuration helps reduce the
> > CommitFailed error, which I think could be one of the reasons for the CPU
> > choke and eventually cause an OOM, I have gone ahead and used all
> possible
> > configuration parameters, but still no luck.
> >
> > It would be great if you could shed some light on this as to what could
> be
> > causing this problem.
> >
> > *Streams Config While Using 0.11.0.0 *
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_RECORDS_CONFIG),
> > 1000)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_INTERVAL_MS_CONFIG),
> > Int.MAX_VALUE)
> >
> > streams = KafkaStreams(builder, StreamsConfig(props))
> > streams.start()
> >
> >
> > *Streams Config While Using 0.11.0.1*
> >
> > val props = Properties()
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > EngineConfig.APPLICATION_ID)
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > EngineConfig.KAFKA_SERVERS)
> > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)
> >
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTB
> > EAT_INTERVAL_MS_CONFIG),
> > 1)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_
> > COMMIT_INTERVAL_MS_CONFIG),
> > 1)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE
> > _AUTO_COMMIT_CONFIG),
> > true)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_RECORDS_CONFIG),
> > 1)
> > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_
> > POLL_INTERVAL_MS_CONFIG),
> > Int.MAX_VALUE)
> > props.put(StreamsConfig.consumerPrefix(Consum

Re: Kafka Streams : CommitFailedException

2017-11-07 Thread Tony John
Hi Guozang,

Thanks for looking into this. I was using 0.11.0.0 version of the library
earlier when I was getting the CommitFailed exception and the tasks were
terminating. The application config then was Replication Factor = 2, Num
Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
Interval = 2147483647. The streams config code (*Streams Config While Using
0.11.0.0*) is given below and the logs of the application while using
0.11.0.0 can be downloaded from
https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0

I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
Though the CommitFailed error logs are still showing up with 0.11.0.1 the
tasks are not getting terminated, but the app quickly runs out of memory
(GC overhead limit exceeded) and the CPU is choked, which was not the case
earlier. The logs are available @
https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
application config is also given below (*Streams Config While Using
0.11.0.1*).  Since I am not sure what configuration helps reduce the
CommitFailed error, which I think could be one of the reasons for the CPU
choke and eventually cause an OOM, I have gone ahead and used all possible
configuration parameters, but still no luck.

It would be great if you could shed some light on this as to what could be
causing this problem.

*Streams Config While Using 0.11.0.0 *

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


*Streams Config While Using 0.11.0.1*

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
true)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
3)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


Thanks,
Tony

On Thu, Nov 2, 2017 at 4:39 PM, Tony John  wrote:

> Hi All,
>
> I am facing CommitFailedException in my streams application. As per the
> log I tried changing the max.poll.interval.ms and max.poll.records. But
> both didn't help. PFA the full stack trace of the exception and below is
> the streams configuration used. What else could be wrong?
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
>  1000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
>  Int.MAX_VALUE)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> Thanks,
> Tony
>


Kafka Streams : CommitFailedException

2017-11-02 Thread Tony John
Hi All,

I am facing CommitFailedException in my streams application. As per the log
I tried changing the max.poll.interval.ms and max.poll.records. But both
didn't help. PFA the full stack trace of the exception and below is the
streams configuration used. What else could be wrong?

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


Thanks,
Tony


Re: Kafka Streams : Problem with Global State Restoration

2017-10-20 Thread Tony John
Hi Damian,

Thanks a lot for the response. Just saw your reply when I visited the
mailer-list
archive
<http://mail-archives.apache.org/mod_mbox/kafka-users/201710.mbox/browser>.
Unfortunately I haven't received the same on my inbox and I didn't even see
the update in the archive when I checked earlier today. Anyways once again
thanks a lot for the response. I will raise a JIRA as you suggested and I
hope this isn't the case with local state stores.

Thanks,
Tony

On Wed, Oct 18, 2017 at 9:21 PM, Tony John  wrote:

> Hello All,
>
> I have been trying to create an application on top of Kafka Streams. I am
> newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
> wrong.
>
> I got the application running fine on a single instance ec2 instance in
> AWS. Now I am looking at scaling and ran in to some issues. The application
> has a global state store and couple of other local one's backed by RocksDB.
> It uses the processor API's and the stream is built using the
> TopologyBuilder. The global state store is fed by a topic which send a key
> value pair (both are protobuf objects) and connected to a processor which
> then transforms the value by applying some logic, finally stores the key
> and the modified data to the store. Similarly the local stores are
> connected via processors which are fed by different topics. Now the issue
> is that when I launch a new instance of the app, task re-allocation and
> state restoration happens, and the stores get replicated on to the new
> instance. But the global store which is replicated on to the new instance
> has some other data (I guess thats the raw data) as opposed to the
> processed data.
>
> *Application Topology*
>
> *Global Store*
>
> Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
> false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
> disabled) -> Global Store
>
> *Local Store*
>
> Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
> true)
>
>  -> LocalStoreProcessor (
> Persistent, Caching enabled, Logging enabled
>
> ) -> Local state stores on different partitions
>
> *Sample Code (Written in Kotlin)*
>
> val streams: KafkaStreams
> init {
> val builder = KStreamBuilder().apply {
>
> val globalStore = Stores.create(Config.DICTIONARY)
> .withKeys(Serdes.String())
> .withValues(Serdes.String())
> .persistent()
> .enableCaching()
> .disableLogging()
> .build() as 
> StateStoreSupplier>
>
> addGlobalStore(globalStore, "dictionary-words-source", 
> Serdes.String().deserializer(), Serdes.String().deserializer(),
> Config.DICTIONARY_WORDS_TOPIC, "dictionary-words-processor", 
> DictionaryWordsProcessor.Supplier)
>
>
> addSource("query-source", Serdes.String().deserializer(), 
> Serdes.String().deserializer(), Config.QUERIES_TOPIC)
> addProcessor("query-processor", QueryProcessor.Supplier, 
> "query-source")
>
> }
>
> val config = StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to 
> Config.APPLICATION_ID,
> StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
> StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
> ))
> streams = KafkaStreams(builder, config)
>
> Runtime.getRuntime().addShutdownHook(Thread {
> println("Shutting down Kafka Streams...")
> streams.close()
> println("Shut down successfully")
> })
> }
>
> fun run() {
> Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1, 
> Config.REPLICATION_FACTOR, true)
> Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT, 
> Config.REPLICATION_FACTOR, false)
> streams.start()
> }
>
>
> *Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
> instances.
>
>
> So just wanted to know the process of state store restoration while
> scaling up and down. How does the streams manage to restore the data? I was
> expecting when the new instance gets launched, the data flows through the
> same processor so that it gets modified using the same logic which is
> applied when it was stored in instance 1. Could you please help me
> understand this little better. Please let me know if there is anyway to get
> the restoration process to route the data via the same processor.
>
>
> Thanks,
> Tony
>


Kafka Streams : Problem with Global State Restoration

2017-10-18 Thread Tony John
Hello All,

I have been trying to create an application on top of Kafka Streams. I am
newbie to Kafka & Kakfa streams. So please excuse if I my understanding are
wrong.

I got the application running fine on a single instance ec2 instance in
AWS. Now I am looking at scaling and ran in to some issues. The application
has a global state store and couple of other local one's backed by RocksDB.
It uses the processor API's and the stream is built using the
TopologyBuilder. The global state store is fed by a topic which send a key
value pair (both are protobuf objects) and connected to a processor which
then transforms the value by applying some logic, finally stores the key
and the modified data to the store. Similarly the local stores are
connected via processors which are fed by different topics. Now the issue
is that when I launch a new instance of the app, task re-allocation and
state restoration happens, and the stores get replicated on to the new
instance. But the global store which is replicated on to the new instance
has some other data (I guess thats the raw data) as opposed to the
processed data.

*Application Topology*

*Global Store*

Source Topic (Partition Count = 1, Replication Factor = 2, Compacted =
false) -> GlobalStoreProcessor (Persistent, Caching enabled, logging
disabled) -> Global Store

*Local Store*

Source Topic (Partition Count = 16, Replication Factor = 2, Compacted =
true)

 -> LocalStoreProcessor (
Persistent, Caching enabled, Logging enabled

) -> Local state stores on different partitions

*Sample Code (Written in Kotlin)*

val streams: KafkaStreams
init {
val builder = KStreamBuilder().apply {

val globalStore = Stores.create(Config.DICTIONARY)
.withKeys(Serdes.String())
.withValues(Serdes.String())
.persistent()
.enableCaching()
.disableLogging()
.build() as
StateStoreSupplier>

addGlobalStore(globalStore, "dictionary-words-source",
Serdes.String().deserializer(), Serdes.String().deserializer(),
Config.DICTIONARY_WORDS_TOPIC,
"dictionary-words-processor", DictionaryWordsProcessor.Supplier)


addSource("query-source", Serdes.String().deserializer(),
Serdes.String().deserializer(), Config.QUERIES_TOPIC)
addProcessor("query-processor", QueryProcessor.Supplier, "query-source")

}

val config =
StreamsConfig(mapOf(StreamsConfig.APPLICATION_ID_CONFIG to
Config.APPLICATION_ID,
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to Config.KAFKA_SERVERS,
StreamsConfig.STATE_DIR_CONFIG to Config.STATE_STORE_DIR
))
streams = KafkaStreams(builder, config)

Runtime.getRuntime().addShutdownHook(Thread {
println("Shutting down Kafka Streams...")
streams.close()
println("Shut down successfully")
})
}

fun run() {
Utils.createTopic(Config.DICTIONARY_WORDS_TOPIC, 1,
Config.REPLICATION_FACTOR, true)
Utils.createTopic(Config.QUERIES_TOPIC, Config.PARTITION_COUNT,
Config.REPLICATION_FACTOR, false)
streams.start()
}


*Environment Details:* 1 ZooKeeper, 2 Brokers, and 1/2 application
instances.


So just wanted to know the process of state store restoration while scaling
up and down. How does the streams manage to restore the data? I was
expecting when the new instance gets launched, the data flows through the
same processor so that it gets modified using the same logic which is
applied when it was stored in instance 1. Could you please help me
understand this little better. Please let me know if there is anyway to get
the restoration process to route the data via the same processor.


Thanks,
Tony