Re: Need broker side property which can restrict a fetch.size

2018-01-09 Thread Srinath C
Hi Shivam,

You can configure quotas per user/client.

[1] https://kafka.apache.org/documentation/#design_quotas

Regards,
Srinath

On Jan 9, 2018 5:19 PM, "Shivam Sharma" <28shivamsha...@gmail.com> wrote:

> Hi,
>
> We are having Kafka cluster(0.11.0.0) of 3 Machines. When we increase
> *max.fetch.size.per.partition* to around 50 MB for topic having 48
> partition, our Kafka Broker is getting OOM.
>
> As our cluster has many consumers, so restricting fetch size on consumer
> side is very difficult which is continuously leading to OOM like issues as
> consumer has power to do this.
>
> Anything can be done on broker side to avoid these issues by restricting
> clients.
>
> Thanks
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> *
>


Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

2018-01-09 Thread Sameer Kumar
Ok, Matthius. Thanks for correcting.

On Wed, Jan 10, 2018 at 3:18 AM, Matthias J. Sax 
wrote:

> Sameer,
>
> the KIP you are pointing to is not related to Kafka Streams'
> task/partition assignment. Kafka Streams uses it's own implementation of
> a partitioning assigner (not the default one the consumer uses).
>
> -Matthias
>
> On 1/9/18 4:22 AM, Sameer Kumar wrote:
> > Got It. Thanks. Others can also take a look at
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 54+-+Sticky+Partition+Assignment+Strategy
> >
> > -Sameer.
> >
> > On Tue, Jan 9, 2018 at 5:33 PM, Damian Guy  wrote:
> >
> >> Hi,
> >>
> >> yes partition assignment is aware of the standby replicas. It will try
> and
> >> assign tasks to the nodes that have the state for the task, but also
> will
> >> try and keep the assignment balanced.
> >> So the assignment will be more like your second assignment. If you are
> >> interested you can have a look at:
> >> https://github.com/apache/kafka/blob/trunk/streams/src/
> >> test/java/org/apache/kafka/streams/processor/internals/assignment/
> >> StickyTaskAssignorTest.java
> >>
> >>
> >> On Tue, 9 Jan 2018 at 11:44 Sameer Kumar 
> wrote:
> >>
> >>> Hi Damian,
> >>>
> >>> Thanks for your reply. I have some further ques.
> >>>
> >>> Would the partition assignment be aware of the standby replicas. What
> >> would
> >>> be the preference for task distribution: load balancing or stand by
> >>> replicas.
> >>>
> >>> For e.g
> >>>
> >>> N1
> >>> assigned partitions: 1,2
> >>> standby partitions: 5,6
> >>>
> >>> N2
> >>> assigned partitions: 3,4
> >>> standby partitions: 1,2
> >>>
> >>> N3
> >>> assigned partitions: 5,6
> >>> standby partitions: 3,4
> >>>
> >>> After N1 goes down, what would be the state of the cluster
> >>>
> >>> N2
> >>> assigned partitions: 3,4,1,2
> >>> standby partitions: 5,6
> >>>
> >>> N3
> >>> assigned partitions: 5,6
> >>> standby partitions: 3,4,1,2
> >>>
> >>> Or
> >>>
> >>> N2
> >>> assigned partitions: 3,4,1
> >>> standby partitions: 2,5,6
> >>>
> >>> N3
> >>> assigned partitions: 5,6,2
> >>> standby partitions: 1,3,4
> >>>
> >>> -Sameer.
> >>>
> >>> On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy 
> wrote:
> >>>
>  On Tue, 9 Jan 2018 at 07:42 Sameer Kumar 
> >> wrote:
> 
> > Hi,
> >
> > I would like to understand how does rebalance affect state stores
> > migration. If I have a cluster of 3 nodes, and 1 goes down, the
>  partitions
> > for node3 gets assigned to node1 and node2, does the rocksdb on
>  node1/node2
> > also starts updating its store from changelog topic.
> >
> >
>  Yes the stores will be migrated to node1 and node2 and they will be
>  restored from the changelog topic
> 
> 
> > If yes, then what impact would this migration process have on
> >> querying.
> >
> 
>  You can't query the stores until they have all been restored and the
>  rebalance ends.
> 
> >
> > Also, if the state store restoration process takes time, how to make
> >>> sure
> > another rebalance doesn''t happen.
> >
> >
>  If you don't lose any more nodes then another rebalance won't happen.
> >> If
>  node1 comes back online, then there will be another rebalance, however
> >>> the
>  time taken shouldn't be as long as it will already have most of the
> >> state
>  locally, so it only needs to catch up with the remainder of the
> >>> changelog.
>  Additionally, you should run with standby tasks. They are updated in
> >> the
>  background and will mean that in the event of failure the other nodes
>  should already have most of the state locally, so the restoration
> >> process
>  won't take so long
> 
> 
> > -Sameer.
> >
> 
> >>>
> >>
> >
>
>


Re: Kafka per topic retention.bytes and global log.retention.bytes not working

2018-01-09 Thread Wim Van Leuven
Upgrade?

On Wed, Jan 10, 2018, 00:26 Thunder Stumpges 
wrote:

> How would I know if we are seeing that issue? We are running 0.11.0.0 so we
> would not have this fix.
>
> On Tue, Jan 9, 2018 at 11:07 AM Wim Van Leuven <
> wim.vanleu...@highestpoint.biz> wrote:
>
> > What minor version of Kafka are you running? Might you be impacted by
> > https://issues.apache.org/jira/browse/KAFKA-6030?
> > -w
> >
> > On Tue, 9 Jan 2018 at 19:02 Thunder Stumpges  >
> > wrote:
> >
> > > Hello, I posted this on StackOverflow
> > > <
> > >
> >
> https://stackoverflow.com/questions/47948399/kafka-per-topic-retention-bytes-and-global-log-retention-bytes-not-working
> > > >also
> > > but haven't gotten any response.
> > >
> > > thanks in advance,
> > > Thunder
> > > __
> > >
> > > We are running a 6 node cluster of kafka 0.11.0. We have set a global
> as
> > > well as a per-topic retention in bytes, neither of which is being
> > applied.
> > > There are no errors that I can see in the logs, just nothing being
> > deleted
> > > (by size; the time retention does seem to be working)
> > >
> > > See relevant configs below:
> > >
> > > *./config/server.properties* :
> > >
> > > # global retention 75GB or 60 days, segment size 512MB
> > > log.retention.bytes=750
> > > log.retention.check.interval.ms=6
> > >
> > > log.retention.hours=1440
> > >
> > > log.cleanup.policy=delete
> > >
> > > log.segment.bytes=536870912
> > >
> > > *topic configuration (30GB):*
> > >
> > > [tstumpges@kafka-02 kafka]$ bin/kafka-topics.sh  --zookeeper
> > > zk-01:2181/kafka --describe --topic stg_logtopic
> > > Topic:stg_logtopicPartitionCount:12   ReplicationFactor:3
> > > Configs:retention.bytes=300
> > > Topic: stg_logtopic   Partition: 0Leader: 4
> > > Replicas: 4,5,6 Isr: 4,5,6
> > > Topic: stg_logtopic   Partition: 1Leader: 5
> > > Replicas: 5,6,1 Isr: 5,1,6
> > > ...
> > >
> > > And, disk usage showing 910GB usage for one partition!
> > >
> > > [tstumpges@kafka-02 kafka]$ sudo du -s -h /data1/kafka-data/*
> > > 82G /data1/kafka-data/stg_logother3-2
> > > 155G/data1/kafka-data/stg_logother2-9
> > > 169G/data1/kafka-data/stg_logother1-6
> > > 910G/data1/kafka-data/stg_logtopic-4
> > >
> > > I can see there are plenty of segment log files (512MB each) in the
> > > partition directory... what is going on?!
> > >
> > > Thanks in advance, Thunder
> > >
> >
>


Re: Best practice for publishing byte messages to Kafka

2018-01-09 Thread Thunder Stumpges
Byte Array is essentially "serialized" already isn't it? I mean the message
itself is sent as a byte array, so the default byte array serializer is as
"efficient" as it gets, as it's just sending your byte array through as the
message... there's no serialization happening.
-Thunder

On Tue, Jan 9, 2018 at 8:17 PM Ali Nazemian  wrote:

> Thanks, Matt. Have you done any benchmarking to see how using different
> Serializers may impact throughput/latency?
>
> Regards,
> Ali
>
> On Wed, Jan 10, 2018 at 7:55 AM, Matt Farmer  wrote:
>
> > We use the default byte array serializer provided with Kafka and it works
> > great for us.
> >
> > > On Jan 9, 2018, at 8:12 AM, Ali Nazemian 
> wrote:
> > >
> > > Hi All,
> > >
> > > I was wondering whether there is any best practice/recommendation for
> > > publishing byte messages to Kafka. Is there any specific Serializer
> that
> > is
> > > recommended for this matter?
> > >
> > > Cheers,
> > > Ali
> >
> >
>
>
> --
> A.Nazemian
>


Re: Best practice for publishing byte messages to Kafka

2018-01-09 Thread Ali Nazemian
Thanks, Matt. Have you done any benchmarking to see how using different
Serializers may impact throughput/latency?

Regards,
Ali

On Wed, Jan 10, 2018 at 7:55 AM, Matt Farmer  wrote:

> We use the default byte array serializer provided with Kafka and it works
> great for us.
>
> > On Jan 9, 2018, at 8:12 AM, Ali Nazemian  wrote:
> >
> > Hi All,
> >
> > I was wondering whether there is any best practice/recommendation for
> > publishing byte messages to Kafka. Is there any specific Serializer that
> is
> > recommended for this matter?
> >
> > Cheers,
> > Ali
>
>


-- 
A.Nazemian


Triggers for offline partition

2018-01-09 Thread Di Shang
Hi

We have several small kafka clusters (0.10.2) of 4 nodes running on cloud 
hosted VMs (each node runs on separate VM). We are having an issue of some 
random partitions going offline intermittently every few days at a fixed 
time (8 am) in a particular datacenter (identical clusters in other 
datacenters work fine). Most partitions have 2~3 replicas, each time the 
partitions go offline, they do not auto-recover and we have to do a 
rolling restart of the cluster to recover it. 

We suspect there's some kind of daily scheduled activity on the 
VM/hardware level in that particular datacenter causing the offline 
partition but could not find anything suspicious. 

Can someone help me to understand under what conditions would a partition 
go offline and unable to auto-recover even though the trigger seems to be 
transient, and what kind of external factor (os, harddisk, network etc.) 
could possibly cause that? Is there any logging we can enable to debug 
this?


Thanks,

Di Shang

--

Australian Development Lab
L9 IBM Centre, 601 Pacific Hwy, St Leonards 2065 NSW Australia
shan...@au1.ibm.com



Re: Kafka per topic retention.bytes and global log.retention.bytes not working

2018-01-09 Thread Thunder Stumpges
How would I know if we are seeing that issue? We are running 0.11.0.0 so we
would not have this fix.

On Tue, Jan 9, 2018 at 11:07 AM Wim Van Leuven <
wim.vanleu...@highestpoint.biz> wrote:

> What minor version of Kafka are you running? Might you be impacted by
> https://issues.apache.org/jira/browse/KAFKA-6030?
> -w
>
> On Tue, 9 Jan 2018 at 19:02 Thunder Stumpges 
> wrote:
>
> > Hello, I posted this on StackOverflow
> > <
> >
> https://stackoverflow.com/questions/47948399/kafka-per-topic-retention-bytes-and-global-log-retention-bytes-not-working
> > >also
> > but haven't gotten any response.
> >
> > thanks in advance,
> > Thunder
> > __
> >
> > We are running a 6 node cluster of kafka 0.11.0. We have set a global as
> > well as a per-topic retention in bytes, neither of which is being
> applied.
> > There are no errors that I can see in the logs, just nothing being
> deleted
> > (by size; the time retention does seem to be working)
> >
> > See relevant configs below:
> >
> > *./config/server.properties* :
> >
> > # global retention 75GB or 60 days, segment size 512MB
> > log.retention.bytes=750
> > log.retention.check.interval.ms=6
> >
> > log.retention.hours=1440
> >
> > log.cleanup.policy=delete
> >
> > log.segment.bytes=536870912
> >
> > *topic configuration (30GB):*
> >
> > [tstumpges@kafka-02 kafka]$ bin/kafka-topics.sh  --zookeeper
> > zk-01:2181/kafka --describe --topic stg_logtopic
> > Topic:stg_logtopicPartitionCount:12   ReplicationFactor:3
> > Configs:retention.bytes=300
> > Topic: stg_logtopic   Partition: 0Leader: 4
> > Replicas: 4,5,6 Isr: 4,5,6
> > Topic: stg_logtopic   Partition: 1Leader: 5
> > Replicas: 5,6,1 Isr: 5,1,6
> > ...
> >
> > And, disk usage showing 910GB usage for one partition!
> >
> > [tstumpges@kafka-02 kafka]$ sudo du -s -h /data1/kafka-data/*
> > 82G /data1/kafka-data/stg_logother3-2
> > 155G/data1/kafka-data/stg_logother2-9
> > 169G/data1/kafka-data/stg_logother1-6
> > 910G/data1/kafka-data/stg_logtopic-4
> >
> > I can see there are plenty of segment log files (512MB each) in the
> > partition directory... what is going on?!
> >
> > Thanks in advance, Thunder
> >
>


Broker won't exit...

2018-01-09 Thread Skip Montanaro
I only discovered the kafka-server-stop.sh script a couple days ago. I
can't seem to make it do its thing (the corresponding zookeeper stop
script seems to work just fine). All consumers have been stopped. Lsof
still shows the Kafka broker process listening on its port. The last
connection left the CLOSE_WAIT state several minutes ago. Gstack shows
169 threads, most in pthread_cond_wait(), a handful in other wait-like
functions (sem_wait, pthread_join, pthread_cond_timedwait, poll,
epoll_wait). I'm running 2.11-1.0.0 on a Red Hat 6 server.

What does it take to get a broker to exit (short of kill -9)?

Thx,

Skip Montanaro


Upgrading Kafka from Version 0.10.2 to 1.0.0

2018-01-09 Thread ZigSphere Tech
Hello All,

Is it easy to upgrade from Kafka version 0.10.2 to 1.0.0 or do I need to
upgrade to version 0.11.0 first? Anything to expect?

Thanks


Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

2018-01-09 Thread Matthias J. Sax
Sameer,

the KIP you are pointing to is not related to Kafka Streams'
task/partition assignment. Kafka Streams uses it's own implementation of
a partitioning assigner (not the default one the consumer uses).

-Matthias

On 1/9/18 4:22 AM, Sameer Kumar wrote:
> Got It. Thanks. Others can also take a look at
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy
> 
> -Sameer.
> 
> On Tue, Jan 9, 2018 at 5:33 PM, Damian Guy  wrote:
> 
>> Hi,
>>
>> yes partition assignment is aware of the standby replicas. It will try and
>> assign tasks to the nodes that have the state for the task, but also will
>> try and keep the assignment balanced.
>> So the assignment will be more like your second assignment. If you are
>> interested you can have a look at:
>> https://github.com/apache/kafka/blob/trunk/streams/src/
>> test/java/org/apache/kafka/streams/processor/internals/assignment/
>> StickyTaskAssignorTest.java
>>
>>
>> On Tue, 9 Jan 2018 at 11:44 Sameer Kumar  wrote:
>>
>>> Hi Damian,
>>>
>>> Thanks for your reply. I have some further ques.
>>>
>>> Would the partition assignment be aware of the standby replicas. What
>> would
>>> be the preference for task distribution: load balancing or stand by
>>> replicas.
>>>
>>> For e.g
>>>
>>> N1
>>> assigned partitions: 1,2
>>> standby partitions: 5,6
>>>
>>> N2
>>> assigned partitions: 3,4
>>> standby partitions: 1,2
>>>
>>> N3
>>> assigned partitions: 5,6
>>> standby partitions: 3,4
>>>
>>> After N1 goes down, what would be the state of the cluster
>>>
>>> N2
>>> assigned partitions: 3,4,1,2
>>> standby partitions: 5,6
>>>
>>> N3
>>> assigned partitions: 5,6
>>> standby partitions: 3,4,1,2
>>>
>>> Or
>>>
>>> N2
>>> assigned partitions: 3,4,1
>>> standby partitions: 2,5,6
>>>
>>> N3
>>> assigned partitions: 5,6,2
>>> standby partitions: 1,3,4
>>>
>>> -Sameer.
>>>
>>> On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy  wrote:
>>>
 On Tue, 9 Jan 2018 at 07:42 Sameer Kumar 
>> wrote:

> Hi,
>
> I would like to understand how does rebalance affect state stores
> migration. If I have a cluster of 3 nodes, and 1 goes down, the
 partitions
> for node3 gets assigned to node1 and node2, does the rocksdb on
 node1/node2
> also starts updating its store from changelog topic.
>
>
 Yes the stores will be migrated to node1 and node2 and they will be
 restored from the changelog topic


> If yes, then what impact would this migration process have on
>> querying.
>

 You can't query the stores until they have all been restored and the
 rebalance ends.

>
> Also, if the state store restoration process takes time, how to make
>>> sure
> another rebalance doesn''t happen.
>
>
 If you don't lose any more nodes then another rebalance won't happen.
>> If
 node1 comes back online, then there will be another rebalance, however
>>> the
 time taken shouldn't be as long as it will already have most of the
>> state
 locally, so it only needs to catch up with the remainder of the
>>> changelog.
 Additionally, you should run with standby tasks. They are updated in
>> the
 background and will mean that in the event of failure the other nodes
 should already have most of the state locally, so the restoration
>> process
 won't take so long


> -Sameer.
>

>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Best practice for publishing byte messages to Kafka

2018-01-09 Thread Matt Farmer
We use the default byte array serializer provided with Kafka and it works great 
for us.

> On Jan 9, 2018, at 8:12 AM, Ali Nazemian  wrote:
> 
> Hi All,
> 
> I was wondering whether there is any best practice/recommendation for
> publishing byte messages to Kafka. Is there any specific Serializer that is
> recommended for this matter?
> 
> Cheers,
> Ali



Re: Kafka per topic retention.bytes and global log.retention.bytes not working

2018-01-09 Thread Wim Van Leuven
What minor version of Kafka are you running? Might you be impacted by
https://issues.apache.org/jira/browse/KAFKA-6030?
-w

On Tue, 9 Jan 2018 at 19:02 Thunder Stumpges 
wrote:

> Hello, I posted this on StackOverflow
> <
> https://stackoverflow.com/questions/47948399/kafka-per-topic-retention-bytes-and-global-log-retention-bytes-not-working
> >also
> but haven't gotten any response.
>
> thanks in advance,
> Thunder
> __
>
> We are running a 6 node cluster of kafka 0.11.0. We have set a global as
> well as a per-topic retention in bytes, neither of which is being applied.
> There are no errors that I can see in the logs, just nothing being deleted
> (by size; the time retention does seem to be working)
>
> See relevant configs below:
>
> *./config/server.properties* :
>
> # global retention 75GB or 60 days, segment size 512MB
> log.retention.bytes=750
> log.retention.check.interval.ms=6
>
> log.retention.hours=1440
>
> log.cleanup.policy=delete
>
> log.segment.bytes=536870912
>
> *topic configuration (30GB):*
>
> [tstumpges@kafka-02 kafka]$ bin/kafka-topics.sh  --zookeeper
> zk-01:2181/kafka --describe --topic stg_logtopic
> Topic:stg_logtopicPartitionCount:12   ReplicationFactor:3
> Configs:retention.bytes=300
> Topic: stg_logtopic   Partition: 0Leader: 4
> Replicas: 4,5,6 Isr: 4,5,6
> Topic: stg_logtopic   Partition: 1Leader: 5
> Replicas: 5,6,1 Isr: 5,1,6
> ...
>
> And, disk usage showing 910GB usage for one partition!
>
> [tstumpges@kafka-02 kafka]$ sudo du -s -h /data1/kafka-data/*
> 82G /data1/kafka-data/stg_logother3-2
> 155G/data1/kafka-data/stg_logother2-9
> 169G/data1/kafka-data/stg_logother1-6
> 910G/data1/kafka-data/stg_logtopic-4
>
> I can see there are plenty of segment log files (512MB each) in the
> partition directory... what is going on?!
>
> Thanks in advance, Thunder
>


Kafka per topic retention.bytes and global log.retention.bytes not working

2018-01-09 Thread Thunder Stumpges
Hello, I posted this on StackOverflow
also
but haven't gotten any response.

thanks in advance,
Thunder
__

We are running a 6 node cluster of kafka 0.11.0. We have set a global as
well as a per-topic retention in bytes, neither of which is being applied.
There are no errors that I can see in the logs, just nothing being deleted
(by size; the time retention does seem to be working)

See relevant configs below:

*./config/server.properties* :

# global retention 75GB or 60 days, segment size 512MB
log.retention.bytes=750
log.retention.check.interval.ms=6

log.retention.hours=1440

log.cleanup.policy=delete

log.segment.bytes=536870912

*topic configuration (30GB):*

[tstumpges@kafka-02 kafka]$ bin/kafka-topics.sh  --zookeeper
zk-01:2181/kafka --describe --topic stg_logtopic
Topic:stg_logtopicPartitionCount:12   ReplicationFactor:3
Configs:retention.bytes=300
Topic: stg_logtopic   Partition: 0Leader: 4
Replicas: 4,5,6 Isr: 4,5,6
Topic: stg_logtopic   Partition: 1Leader: 5
Replicas: 5,6,1 Isr: 5,1,6
...

And, disk usage showing 910GB usage for one partition!

[tstumpges@kafka-02 kafka]$ sudo du -s -h /data1/kafka-data/*
82G /data1/kafka-data/stg_logother3-2
155G/data1/kafka-data/stg_logother2-9
169G/data1/kafka-data/stg_logother1-6
910G/data1/kafka-data/stg_logtopic-4

I can see there are plenty of segment log files (512MB each) in the
partition directory... what is going on?!

Thanks in advance, Thunder


UNKNOWN_TOPIC_OR_PARTITION for kafka producer

2018-01-09 Thread chidigam .
Hi All,
I have SSL from client to broker,SSL again for broker to borker
communication and kerberos from broker to ZK
The topics exists when i do describe from ZK Cli.

Authorization log clearly says, it can allow principal to describe and
write to the topic.

When producing the message using kafka produce cli, i am getting
UNKNOWN_TOPIC_OR_PARTITION error.

I am clueless who to troubleshoot the problem.
Kindly anyone please give any suggestion or give direction to trouble the
problem.

Regards
bhanu


Best practice for publishing byte messages to Kafka

2018-01-09 Thread Ali Nazemian
Hi All,

I was wondering whether there is any best practice/recommendation for
publishing byte messages to Kafka. Is there any specific Serializer that is
recommended for this matter?

Cheers,
Ali


Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

2018-01-09 Thread Sameer Kumar
Got It. Thanks. Others can also take a look at
https://cwiki.apache.org/confluence/display/KAFKA/KIP-54+-+Sticky+Partition+Assignment+Strategy

-Sameer.

On Tue, Jan 9, 2018 at 5:33 PM, Damian Guy  wrote:

> Hi,
>
> yes partition assignment is aware of the standby replicas. It will try and
> assign tasks to the nodes that have the state for the task, but also will
> try and keep the assignment balanced.
> So the assignment will be more like your second assignment. If you are
> interested you can have a look at:
> https://github.com/apache/kafka/blob/trunk/streams/src/
> test/java/org/apache/kafka/streams/processor/internals/assignment/
> StickyTaskAssignorTest.java
>
>
> On Tue, 9 Jan 2018 at 11:44 Sameer Kumar  wrote:
>
> > Hi Damian,
> >
> > Thanks for your reply. I have some further ques.
> >
> > Would the partition assignment be aware of the standby replicas. What
> would
> > be the preference for task distribution: load balancing or stand by
> > replicas.
> >
> > For e.g
> >
> > N1
> > assigned partitions: 1,2
> > standby partitions: 5,6
> >
> > N2
> > assigned partitions: 3,4
> > standby partitions: 1,2
> >
> > N3
> > assigned partitions: 5,6
> > standby partitions: 3,4
> >
> > After N1 goes down, what would be the state of the cluster
> >
> > N2
> > assigned partitions: 3,4,1,2
> > standby partitions: 5,6
> >
> > N3
> > assigned partitions: 5,6
> > standby partitions: 3,4,1,2
> >
> > Or
> >
> > N2
> > assigned partitions: 3,4,1
> > standby partitions: 2,5,6
> >
> > N3
> > assigned partitions: 5,6,2
> > standby partitions: 1,3,4
> >
> > -Sameer.
> >
> > On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy  wrote:
> >
> > > On Tue, 9 Jan 2018 at 07:42 Sameer Kumar 
> wrote:
> > >
> > > > Hi,
> > > >
> > > > I would like to understand how does rebalance affect state stores
> > > > migration. If I have a cluster of 3 nodes, and 1 goes down, the
> > > partitions
> > > > for node3 gets assigned to node1 and node2, does the rocksdb on
> > > node1/node2
> > > > also starts updating its store from changelog topic.
> > > >
> > > >
> > > Yes the stores will be migrated to node1 and node2 and they will be
> > > restored from the changelog topic
> > >
> > >
> > > > If yes, then what impact would this migration process have on
> querying.
> > > >
> > >
> > > You can't query the stores until they have all been restored and the
> > > rebalance ends.
> > >
> > > >
> > > > Also, if the state store restoration process takes time, how to make
> > sure
> > > > another rebalance doesn''t happen.
> > > >
> > > >
> > > If you don't lose any more nodes then another rebalance won't happen.
> If
> > > node1 comes back online, then there will be another rebalance, however
> > the
> > > time taken shouldn't be as long as it will already have most of the
> state
> > > locally, so it only needs to catch up with the remainder of the
> > changelog.
> > > Additionally, you should run with standby tasks. They are updated in
> the
> > > background and will mean that in the event of failure the other nodes
> > > should already have most of the state locally, so the restoration
> process
> > > won't take so long
> > >
> > >
> > > > -Sameer.
> > > >
> > >
> >
>


Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

2018-01-09 Thread Damian Guy
Hi,

yes partition assignment is aware of the standby replicas. It will try and
assign tasks to the nodes that have the state for the task, but also will
try and keep the assignment balanced.
So the assignment will be more like your second assignment. If you are
interested you can have a look at:
https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java


On Tue, 9 Jan 2018 at 11:44 Sameer Kumar  wrote:

> Hi Damian,
>
> Thanks for your reply. I have some further ques.
>
> Would the partition assignment be aware of the standby replicas. What would
> be the preference for task distribution: load balancing or stand by
> replicas.
>
> For e.g
>
> N1
> assigned partitions: 1,2
> standby partitions: 5,6
>
> N2
> assigned partitions: 3,4
> standby partitions: 1,2
>
> N3
> assigned partitions: 5,6
> standby partitions: 3,4
>
> After N1 goes down, what would be the state of the cluster
>
> N2
> assigned partitions: 3,4,1,2
> standby partitions: 5,6
>
> N3
> assigned partitions: 5,6
> standby partitions: 3,4,1,2
>
> Or
>
> N2
> assigned partitions: 3,4,1
> standby partitions: 2,5,6
>
> N3
> assigned partitions: 5,6,2
> standby partitions: 1,3,4
>
> -Sameer.
>
> On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy  wrote:
>
> > On Tue, 9 Jan 2018 at 07:42 Sameer Kumar  wrote:
> >
> > > Hi,
> > >
> > > I would like to understand how does rebalance affect state stores
> > > migration. If I have a cluster of 3 nodes, and 1 goes down, the
> > partitions
> > > for node3 gets assigned to node1 and node2, does the rocksdb on
> > node1/node2
> > > also starts updating its store from changelog topic.
> > >
> > >
> > Yes the stores will be migrated to node1 and node2 and they will be
> > restored from the changelog topic
> >
> >
> > > If yes, then what impact would this migration process have on querying.
> > >
> >
> > You can't query the stores until they have all been restored and the
> > rebalance ends.
> >
> > >
> > > Also, if the state store restoration process takes time, how to make
> sure
> > > another rebalance doesn''t happen.
> > >
> > >
> > If you don't lose any more nodes then another rebalance won't happen. If
> > node1 comes back online, then there will be another rebalance, however
> the
> > time taken shouldn't be as long as it will already have most of the state
> > locally, so it only needs to catch up with the remainder of the
> changelog.
> > Additionally, you should run with standby tasks. They are updated in the
> > background and will mean that in the event of failure the other nodes
> > should already have most of the state locally, so the restoration process
> > won't take so long
> >
> >
> > > -Sameer.
> > >
> >
>


Need broker side property which can restrict a fetch.size

2018-01-09 Thread Shivam Sharma
Hi,

We are having Kafka cluster(0.11.0.0) of 3 Machines. When we increase
*max.fetch.size.per.partition* to around 50 MB for topic having 48
partition, our Kafka Broker is getting OOM.

As our cluster has many consumers, so restricting fetch size on consumer
side is very difficult which is continuously leading to OOM like issues as
consumer has power to do this.

Anything can be done on broker side to avoid these issues by restricting
clients.

Thanks

-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsha...@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
*


Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

2018-01-09 Thread Sameer Kumar
Hi Damian,

Thanks for your reply. I have some further ques.

Would the partition assignment be aware of the standby replicas. What would
be the preference for task distribution: load balancing or stand by
replicas.

For e.g

N1
assigned partitions: 1,2
standby partitions: 5,6

N2
assigned partitions: 3,4
standby partitions: 1,2

N3
assigned partitions: 5,6
standby partitions: 3,4

After N1 goes down, what would be the state of the cluster

N2
assigned partitions: 3,4,1,2
standby partitions: 5,6

N3
assigned partitions: 5,6
standby partitions: 3,4,1,2

Or

N2
assigned partitions: 3,4,1
standby partitions: 2,5,6

N3
assigned partitions: 5,6,2
standby partitions: 1,3,4

-Sameer.

On Tue, Jan 9, 2018 at 2:27 PM, Damian Guy  wrote:

> On Tue, 9 Jan 2018 at 07:42 Sameer Kumar  wrote:
>
> > Hi,
> >
> > I would like to understand how does rebalance affect state stores
> > migration. If I have a cluster of 3 nodes, and 1 goes down, the
> partitions
> > for node3 gets assigned to node1 and node2, does the rocksdb on
> node1/node2
> > also starts updating its store from changelog topic.
> >
> >
> Yes the stores will be migrated to node1 and node2 and they will be
> restored from the changelog topic
>
>
> > If yes, then what impact would this migration process have on querying.
> >
>
> You can't query the stores until they have all been restored and the
> rebalance ends.
>
> >
> > Also, if the state store restoration process takes time, how to make sure
> > another rebalance doesn''t happen.
> >
> >
> If you don't lose any more nodes then another rebalance won't happen. If
> node1 comes back online, then there will be another rebalance, however the
> time taken shouldn't be as long as it will already have most of the state
> locally, so it only needs to catch up with the remainder of the changelog.
> Additionally, you should run with standby tasks. They are updated in the
> background and will mean that in the event of failure the other nodes
> should already have most of the state locally, so the restoration process
> won't take so long
>
>
> > -Sameer.
> >
>


Re: Less poll interval on StoreChangelogReader

2018-01-09 Thread Damian Guy
State Store restoration is done on the same thread as processing. It is
actually interleaved with processing, so we keep the poll time small so
that if there is no data immediately available we can continue to process
data from other running tasks.

On Tue, 9 Jan 2018 at 08:03 Sameer Kumar  wrote:

> In StoreChangelogReader.restore, we have a very short poll interval of 10
> ms. Any specfic reasons for the same.
>
> -Sameer.
>


Re: Kafka Streams | Impact on rocksdb stores by Rebalancing

2018-01-09 Thread Damian Guy
On Tue, 9 Jan 2018 at 07:42 Sameer Kumar  wrote:

> Hi,
>
> I would like to understand how does rebalance affect state stores
> migration. If I have a cluster of 3 nodes, and 1 goes down, the partitions
> for node3 gets assigned to node1 and node2, does the rocksdb on node1/node2
> also starts updating its store from changelog topic.
>
>
Yes the stores will be migrated to node1 and node2 and they will be
restored from the changelog topic


> If yes, then what impact would this migration process have on querying.
>

You can't query the stores until they have all been restored and the
rebalance ends.

>
> Also, if the state store restoration process takes time, how to make sure
> another rebalance doesn''t happen.
>
>
If you don't lose any more nodes then another rebalance won't happen. If
node1 comes back online, then there will be another rebalance, however the
time taken shouldn't be as long as it will already have most of the state
locally, so it only needs to catch up with the remainder of the changelog.
Additionally, you should run with standby tasks. They are updated in the
background and will mean that in the event of failure the other nodes
should already have most of the state locally, so the restoration process
won't take so long


> -Sameer.
>


Less poll interval on StoreChangelogReader

2018-01-09 Thread Sameer Kumar
In StoreChangelogReader.restore, we have a very short poll interval of 10
ms. Any specfic reasons for the same.

-Sameer.