RE: [EXTERNAL] Re: Request for access to create KIP

2020-08-13 Thread Koushik Chitta
Accountid: koushikchitta

Cheers,
Koushik

-Original Message-
From: Boyang Chen  
Sent: Sunday, August 9, 2020 6:02 PM
To: dev 
Subject: [EXTERNAL] Re: Request for access to create KIP

Have you created the account already? What's your account id?

On Sat, Aug 8, 2020 at 4:36 PM Koushik Chitta 
wrote:

> Hi Team,
>
> Can you please grant me access to create KIP ?
>
> Thanks,
> Koushik
>


Request for access to create KIP

2020-08-08 Thread Koushik Chitta
Hi Team,

Can you please grant me access to create KIP ?

Thanks,
Koushik


RE: Issue in retention with compact,delete cleanup policy

2020-03-03 Thread Koushik Chitta
Bubbling this up to understand if anyone else are in similar use case.


-Original Message-
From: Koushik Chitta  
Sent: Sunday, February 23, 2020 1:35 PM
To: us...@kafka.apache.org; dev@kafka.apache.org
Subject: [EXTERNAL] Issue in retention with compact,delete cleanup policy

Hi,

I have a Topic with following config.

cleanup.policy  =  compact,delete
segment.bytes = 52428800 (~52 mb)
min.compaction.lag.ms = 180 (30 min) delete.retention.ms = 8640 (1 day) 
retention.ms = 25920 (3 days)

Ideally I would want the old records > 3 days to be deleted without producing 
an explicit delete(null value of a key) of the record.
But there can be a case due to continuous compaction, the segments can contain 
a very old record(eg: > 30 days) and new recent record (eg: 1hr) which will 
make the segment ineligible for retention delete.

Currently I don't see a work around for this. Please suggest.
I plan to start a KIP to address this use case.

Thanks,
Koushik



Issue in retention with compact,delete cleanup policy

2020-02-23 Thread Koushik Chitta
Hi,

I have a Topic with following config.

cleanup.policy  =  compact,delete
segment.bytes = 52428800 (~52 mb)
min.compaction.lag.ms = 180 (30 min)
delete.retention.ms = 8640 (1 day)
retention.ms = 25920 (3 days)

Ideally I would want the old records > 3 days to be deleted without producing 
an explicit delete(null value of a key) of the record.
But there can be a case due to continuous compaction, the segments can contain 
a very old record(eg: > 30 days) and new recent record (eg: 1hr) which will 
make the segment ineligible for retention delete.

Currently I don't see a work around for this. Please suggest.
I plan to start a KIP to address this use case.

Thanks,
Koushik



RE: [VOTE] KIP-455: Create an Administrative API for Replica Reassignment

2019-08-08 Thread Koushik Chitta
Thanks Colin, George.   Can we restart the voting for this KIP.

Thanks,
Koushik 

-Original Message-
From: Colin McCabe  
Sent: Wednesday, August 7, 2019 5:17 PM
To: dev@kafka.apache.org
Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
Reassignment

On Wed, Aug 7, 2019, at 15:41, George Li wrote:
> This email seemed to get lost in the dev email server.  Resending. 
> 
> 
> On Tuesday, August 6, 2019, 10:16:57 PM PDT, George Li 
>  wrote:
> 
> 
> The pending reassignments partitions would be reported as URP (Under 
> Replicated Partitions).  or maybe reported as a separate metrics of 
> RURP (Reassignment URP) since now we can derived from the new 
> AddingReplicas. An alert could be triggered based on this.
> 

Hi George,

I agree that this would be a great idea for follow up work.  Check out KIP-352, 
which discusses creating a such a metric. :)

> 
> 
> It would be nice if ListPartitionReassignmentResult could return the 
> "elapsed time/duration" of the current pending reassignments, the 
> calling client can flag those current long running reassignments and 
> alert.  However, what I would be interested is probably the total # of 
> pending reassignments because I will submit reassignments in batches, 
> e.g. 50 reassignments per batch.  If the pending reassignments # is 
> below that per batch #, submit more new reassignments = (per_batch_# - 
> pending_#).
> 

It is definitely useful to know what reassignments exist.  If you call 
ListPartitionReassignments, you can count how many results you get, in order to 
implement a policy like that.

I'm not sure if knowing how long reassignments have been in progress will be 
important or not.  I think we should give people some time to try out the new 
APIs and see what could be improved based on their experience.

> 
> 
> It seems currently, the ReplicaFetcher threads could quite easily crash 
> because of some exceptions. e.g. Java Out Of Memory, and would just 
> remain dead (jstack to dump threads to check the # of running 
> ReplicaFetcher threads) without getting restarted automatically, so 
> needs to bounce the broker.  It would be nice to make the 
> ReplicaFetcher more robust/resilient of catching more exceptions, and 
> if crashed, get restarted after some time. 
> 

This has definitely been an issue in the past, I agree.  Thankfully, we 
recently did improve the robustness of the ReplicaFetcher.  Check out "KIP-461: 
Improve Replica Fetcher behavior at handling partition failure."

cheers,
Colin

> 
> 
> Thanks,
> 
> George
> 
> 
> 
> On 2019/08/06 23:07:19, "Colin McCabe"  wrote: 
> > Hi Koushik,
> > 
> > Thanks for the idea.  This KIP is already pretty big, so I think we'll have 
> > to consider ideas like this in follow-on KIPs.
> > 
> > In general, figuring out what's wrong with replication is a pretty tough 
> > problem.  If we had an API for this, we'd probably want it to be unified, 
> > and not specific to reassigning partitions.
> > 
> > regards,
> > Colin
> > 
> > 
> > On Tue, Aug 6, 2019, at 10:57, Koushik Chitta wrote:
> > > Hey Colin,
> > > 
> > > Can the ListPartitionReassignmentsResult include the status of the 
> > > current reassignment progress of each partition? A reassignment can be 
> > > in progress for different reasons and the status can give the option to 
> > > alter the current reassignment.
> > > 
> > > Example -  A leaderISRRequest of a new assigned replicas can be 
> > > ignored/errored because of a storage exception.  And reassignment batch 
> > > will be waiting indefinitely for the new assigned replicas to be in 
> > > sync with the leader of the partition.  
> > > Showing the status will give an option to alter the affected 
> > > partitions and allow the batch to complete reassignment.
> > > 
> > > OAR = {1, 2, 3} and RAR = {4,5,6}
> > > 
> > >  AR leader/isr
> > > {1,2,3,4,5,6}1/{1,2,3,4,6}   =>  LeaderISRRequest 
> > > was lost/skipped for 5 and the reassignment operation will be waiting 
> > > indefinitely for the 5 to be insync.
> > > 
> > > 
> > > 
> > > Thanks,
> > > Koushik
> > > 
> > > -Original Message-
> > > From: Jun Rao  
> > > Sent: Friday, August 2, 2019 10:04 AM
> > > To: dev 
> > > Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
> > > Reassignment
> > > 
> > > Hi, Colin,
> > > 
> > > First, since we are changing the format o

RE: [VOTE] KIP-455: Create an Administrative API for Replica Reassignment

2019-08-06 Thread Koushik Chitta
Hey Colin,

Can the ListPartitionReassignmentsResult include the status of the current 
reassignment progress of each partition? A reassignment can be in progress for 
different reasons and the status can give the option to alter the current 
reassignment.

Example -  A leaderISRRequest of a new assigned replicas can be ignored/errored 
because of a storage exception.  And reassignment batch will be waiting 
indefinitely for the new assigned replicas to be in sync with the leader of the 
partition.  
  Showing the status will give an option to alter the affected 
partitions and allow the batch to complete reassignment.

OAR = {1, 2, 3} and RAR = {4,5,6}

 AR leader/isr
{1,2,3,4,5,6}1/{1,2,3,4,6}   =>  LeaderISRRequest was 
lost/skipped for 5 and the reassignment operation will be waiting indefinitely 
for the 5 to be insync.



Thanks,
Koushik

-Original Message-
From: Jun Rao  
Sent: Friday, August 2, 2019 10:04 AM
To: dev 
Subject: Re: [VOTE] KIP-455: Create an Administrative API for Replica 
Reassignment

Hi, Colin,

First, since we are changing the format of LeaderAndIsrRequest, which is an 
inter broker request, it seems that we will need IBP during rolling upgrade. 
Could we add that to the compatibility section?

Regarding UnsupportedVersionException, even without ZK node version bump, we 
probably want to only use the new ZK value fields after all brokers have been 
upgraded to the new binary. Otherwise, the reassignment task may not be 
completed if the controller changes to a broker still on the old binary.
IBP is one way to achieve that. The main thing is that we need some way for the 
controller to deal with the new ZK fields. Dealing with the additional ZK node 
version bump seems a small thing on top of that?

Thanks,

Jun

On Thu, Aug 1, 2019 at 3:05 PM Colin McCabe  wrote:

> On Thu, Aug 1, 2019, at 12:00, Jun Rao wrote:
> > Hi, Colin,
> >
> > 10. Sounds good.
> >
> > 13. Our current convention is to bump up the version of ZK value if 
> > there is any format change. For example, we have bumped up the 
> > version of the value in /brokers/ids/nnn multiple times and all of 
> > those changes are compatible (just adding new fields). This has the 
> > slight benefit that it makes it clear there is a format change. 
> > Rolling upgrades and downgrades can still be supported with the 
> > version bump. For example, if you
> downgrade
> > from a compatible change, you can leave the new format in ZK and the 
> > old code will only pick up fields relevant to the old version. 
> > Upgrade will
> be
> > controlled by inter broker protocol.
>
> Hmm.  If we bump that ZK node version, we will need a new inter-broker 
> protocol version.  We also need to return UnsupportedVersionException 
> from the alterPartitionReassignments and listPartitionReassignments 
> APIs when the IBP is too low.  This sounds doable, although we might 
> need a release note that upgrading the IBP is necessary to allow 
> reassignment operations after an upgrade.
>
> best,
> Colin
>
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Jul 31, 2019 at 1:22 PM Colin McCabe  wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for taking another look at this.
> > >
> > > On Wed, Jul 31, 2019, at 09:22, Jun Rao wrote:
> > > > Hi, Stan,
> > > >
> > > > Thanks for the explanation.
> > > >
> > > > 10. If those new fields in LeaderAndIsr are only needed for 
> > > > future
> work,
> > > > perhaps they should be added when we do the future work instead 
> > > > of
> now?
> > >
> > > I think this ties in with one of the big goals of this KIP, making 
> > > it possible to distinguish reassigning replicas from normal replicas.
> This is
> > > the key to follow-on work like being able to ensure that 
> > > partitions
> with a
> > > reassignment don't get falsely flagged as under-replicated in the
> metrics,
> > > or implementing reassignment quotas that don't accidentally affect
> normal
> > > replication traffic when a replica falls out of the ISR.
> > >
> > > For these follow-on improvements, we need to have that information 
> > > in LeaderAndIsrRequest.  We could add the information in a 
> > > follow-on KIP,
> of
> > > course, but then all the improvements are blocked on that 
> > > follow-on
> KIP.
> > > That would slow things down for all of the downstream KIPs that 
> > > are
> blocked
> > > on this.
> > >
> > > Also, to keep things consistent, I think it would be best if the
> format of
> > > the data in the LeaderAndIsrRequest matched the format of the data 
> > > in ZooKeeper.  Since we're deciding on the ZK format in this KIP, 
> > > I think
> it
> > > makes sense to also decide on the format in the LeaderAndIsrRequest.
> > >
> > > > > > Should we include those two fields in UpdateMetadata and
> potentially
> > > > > > Metadata requests too?
> > >
> > > We had some discussion earlier about how metadata responses to 
> > > clients
> are
> > > getting too large, in part because they 

Use of version attribute in reassignment json of kafka-reassign-partitions.sh

2019-04-11 Thread Koushik Chitta
Hi,

What is the use/intention of version attribute in kafka-reassign-partitions.sh 
command ? I see the code(at least version 1.1) just ignores this attribute.

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 
> --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
  {"topic":"foo1","partition":0,"replicas":[3,4]},
  {"topic":"foo2","partition":2,"replicas":[1,2]},
  {"topic":"foo2","partition":0,"replicas":[3,4]},
  {"topic":"foo1","partition":1,"replicas":[2,3]},
  {"topic":"foo2","partition":1,"replicas":[2,3]}]
}


Thanks,
Koushik


Inconsistent Replica list for a partition

2018-10-11 Thread Koushik Chitta
Hi,

The no. of replicas of partition : 1 is less than rest of the partitions, In 
which scenarios this can happen ?

Kafka version - 0.10.2
[cid:image001.png@01D4616F.405D2FA0]

Thanks,
Koushik


RE: Kafka consumer to unzip stream of .gz files and read

2018-05-21 Thread Koushik Chitta
You should read the message value as byte array rather than string .
Other Approach is , while producing you can use the kafka compression = GZIP to 
have similar results.


-Original Message-
From: mayur shah  
Sent: Monday, May 21, 2018 1:50 AM
To: us...@kafka.apache.org; dev@kafka.apache.org
Subject: Kafka consumer to unzip stream of .gz files and read

 HI Team,

Greeting!

I am facing one issue on kafka consumer using python hope you guys help us to 
resolve this issue

Kafka consumer to unzip stream of .gz files and read 


Kafka producer is sending .gz files but not able to decompress and read the 
files at the consumer end. Getting error as "IOError: Not a gzipped file"

Producer -

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Airport < 
~/Downloads/stocks.json.gz

Consumer -

import sys import gzipimport StringIOfrom kafka import KafkaConsumer

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)
try:
for message in consumer:
f = StringIO.StringIO(message.value)
gzip_f = gzip.GzipFile(fileobj=f)
unzipped_content = gzip_f.read()
content = unzipped_content.decode('utf8')
print (content)except KeyboardInterrupt:
sys.exit()

Error at consumer -

Traceback (most recent call last):
  File "consumer.py", line 18, in 
unzipped_content = gzip_f.read()
  File "/usr/lib64/python2.6/gzip.py", line 212, in read
self._read(readsize)
  File "/usr/lib64/python2.6/gzip.py", line 255, in _read
self._read_gzip_header()
  File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
raise IOError, 'Not a gzipped file'IOError: Not a gzipped file

Regards,
Mayur


SSL Keymanager implementation using OS managed keystore

2018-04-04 Thread Koushik Chitta
Hi,

Running kafka on Windows OS, I would like to take advantage of  windows managed 
keystore (eg: Windows-MY, can be loaded without a user, pass) .Eventually wants 
to use a custom keymanager implementation.
>From the code I see, SSLFactory doesn't allow to use an existing keystore and 
>without keystore It cannot load a custom keymanager implementation.  Is my 
>understanding correct here  ? Any suggestions are welcome.

Best Regards,
Koushik


RE: [VOTE] KIP-257 - Configurable Quota Management

2018-03-28 Thread Koushik Chitta
+1 . Thanks for the KIP.

-Original Message-
From: Rajini Sivaram  
Sent: Thursday, March 22, 2018 2:57 PM
To: dev 
Subject: [VOTE] KIP-257 - Configurable Quota Management

Hi all,

I would like to start vote on KIP-257 to enable customisation of client quota 
computation:

https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-257%2B-%2BConfigurable%2BQuota%2BManagement=04%7C01%7Ckchitta%40microsoft.com%7C627ce65854e8448a51ec08d5903fcc43%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C636573526042774361%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwifQ%3D%3D%7C-1=qVIbP%2BMsy6%2BAQfnyVrqYl4KxGF%2BUt7M%2FNTIzarr5RMY%3D=0

The KIP proposes to make quota management pluggable to enable group-based and 
partition-based quotas for clients.


Thanks,


Rajini