Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-20 Thread Jun Rao
Hi, Mayuresh,

Thanks for the updated KIP. A couple of more comments.

1. Do we convert java.security.Principal to KafkaPrincipal for
authorization check in SimpleAclAuthorizer? If so, it would be useful to
mention that in the wiki so that people can understand how this change
doesn't affect the default authorizer implementation.

2. Currently, we log the principal name in the request log in
RequestChannel, which has the format of "principalType + SEPARATOR + name;".
It would be good if we can keep the same convention after this KIP. One way
to do that is to convert java.security.Principal to KafkaPrincipal for
logging the requests.

Jun


On Fri, Feb 17, 2017 at 5:35 PM, Mayuresh Gharat  wrote:

> Hi Jun,
>
> I have updated the KIP. Would you mind taking another look?
>
> Thanks,
>
> Mayuresh
>
> On Fri, Feb 17, 2017 at 4:42 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Jun,
> >
> > Sure sounds good to me.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Feb 17, 2017 at 1:54 PM, Jun Rao  wrote:
> >
> >> Hi, Mani,
> >>
> >> Good point on using PrincipalBuilder for SASL. It seems that
> >> PrincipalBuilder already has access to Authenticator. So, we could just
> >> enable that in SaslChannelBuilder. We probably could do that in a
> separate
> >> KIP?
> >>
> >> Hi, Mayuresh,
> >>
> >> If you don't think there is a concrete use case for using
> >> PrincipalBuilder in
> >> kafka-acls.sh, perhaps we could do the simpler approach for now?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >>
> >> On Fri, Feb 17, 2017 at 12:23 PM, Mayuresh Gharat <
> >> gharatmayures...@gmail.com> wrote:
> >>
> >> > @Manikumar,
> >> >
> >> > Can you give an example how you are planning to use PrincipalBuilder?
> >> >
> >> > @Jun
> >> > Yes, that is right. To give a brief overview, we just extract the cert
> >> and
> >> > hand it over to a third party library for creating a Principal. So we
> >> > cannot create a Principal from just a string.
> >> > The main motive behind adding the PrincipalBuilder for kafk-acls.sh
> was
> >> > that someone else (who can generate a Principal from map of propertie,
> >> >  for example) can use it.
> >> > As I said, Linkedin is fine with not making any changes to
> Kafka-acls.sh
> >> > for now. But we thought that it would be a good improvement to the
> tool
> >> and
> >> > it makes it more flexible and usable.
> >> >
> >> > Let us know your thoughts, if you would like us to make kafka-acls.sh
> >> more
> >> > flexible and usable and not limited to Authorizer coming out of the
> box.
> >> >
> >> > Thanks,
> >> >
> >> > Mayuresh
> >> >
> >> >
> >> > On Thu, Feb 16, 2017 at 10:18 PM, Manikumar <
> manikumar.re...@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Jun,
> >> > >
> >> > > yes, we can just customize rules to send full principal name.  I was
> >> > > just thinking to
> >> > > use PrinciplaBuilder interface for implementing SASL rules also. So
> >> that
> >> > > the interface
> >> > > will be consistent across protocols.
> >> > >
> >> > > Thanks
> >> > >
> >> > > On Fri, Feb 17, 2017 at 1:07 AM, Jun Rao  wrote:
> >> > >
> >> > > > Hi, Radai, Mayuresh,
> >> > > >
> >> > > > Thanks for the explanation. Good point on a pluggable authorizer
> can
> >> > > > customize how acls are added. However, earlier, Mayuresh was
> saying
> >> > that
> >> > > in
> >> > > > LinkedIn's customized authorizer, it's not possible to create a
> >> > principal
> >> > > > from string. If that's the case, will adding the principal builder
> >> in
> >> > > > kafka-acl.sh help? If the principal can be constructed from a
> >> string,
> >> > > > wouldn't it be simpler to just let kafka-acl.sh do authorization
> >> based
> >> > on
> >> > > > that string name and not be aware of the principal builder? If you
> >> > still
> >> > > > think there is a need, perhaps you can add a more concrete use
> case
> >> > that
> >> > > > can't be done otherwise?
> >> > > >
> >> > > >
> >> > > > Hi, Mani,
> >> > > >
> >> > > > For SASL, if the authorizer needs the full kerberos principal
> name,
> >> > > > currently, the user can just customize "
> sasl.kerberos.principal.to.
> >> > > > local.rules"
> >> > > > to return the full principal name as the name for authorization,
> >> right?
> >> > > >
> >> > > > Thanks,
> >> > > >
> >> > > > Jun
> >> > > >
> >> > > > On Wed, Feb 15, 2017 at 10:25 AM, Mayuresh Gharat <
> >> > > > gharatmayures...@gmail.com> wrote:
> >> > > >
> >> > > > > @Jun thanks for the comments.Please see the replies inline.
> >> > > > >
> >> > > > > Currently kafka-acl.sh just creates an ACL path in ZK with the
> >> > > principal
> >> > > > > name string.
> >> > > > > > Yes, the kafka-acl.sh calls the addAcl() on the inbuilt
> >> > > > > SimpleAclAuthorizer which in turn creates an ACL in ZK with the
> >> > > Principal
> >> > > > > name string. This is because we supply the SimpleAclAuthorizer
> as
> >> a
> >> > > > > 

Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-20 Thread Jun Rao
Hi, Dong,

Sorry for the delay. A few more comments.

20. One complexity that I found in the current KIP is that the way the
broker communicates failed replicas to the controller is inefficient. When
a log directory fails, the broker only sends an indication through ZK to
the controller and the controller has to issue a LeaderAndIsrRequest to
discover which replicas are offline due to log directory failure. An
alternative approach is that when a log directory fails, the broker just
writes the failed the directory and the corresponding topic partitions in a
new failed log directory ZK path like the following.

Failed log directory path:
/brokers/ids/[brokerId]/failed-log-directory/directory1 => { json of the
topic partitions in the log directory }.

The controller just watches for child changes in
/brokers/ids/[brokerId]/failed-log-directory.
After reading this path, the broker knows the exact set of replicas that
are offline and can trigger that replica state change accordingly. This
saves an extra round of LeaderAndIsrRequest handling.

With this new ZK path, we get probably get rid of/broker/topics/[topic]/
partitions/[partitionId]/controller_managed_state. The creation of a new
replica is expected to always succeed unless all log directories fail, in
which case, the broker goes down anyway. Then, during controller failover,
the controller just needs to additionally read from ZK the extra failed log
directory paths, which is many fewer than topics or partitions.

On broker startup, if a log directory becomes available, the corresponding
log directory path in ZK will be removed.

The downside of this approach is that the value of this new ZK path can be
large. However, even with 5K partition per log directory and 100 bytes per
partition, the size of the value is 500KB, still less than the default 1MB
znode limit in ZK.

21. "Broker will remove offline replica from its replica fetcher threads."
The proposal lets the broker remove the replica from the replica fetcher
thread when it detects a directory failure. An alternative is to only do
that until the broker receives the LeaderAndIsrRequest/StopReplicaRequest.
The benefit of this is that the controller is the only one who decides
which replica to be removed from the replica fetcher threads. The broker
also doesn't need additional logic to remove the replica from replica
fetcher threads. The downside is that in a small window, the replica fetch
thread will keep writing to the failed log directory and may pollute the
log4j log.

22. In the current design, there is a potential corner case issue that the
same partition may exist in more than one log directory at some point.
Consider the following steps: (1) a new topic t1 is created and the
controller sends LeaderAndIsrRequest to a broker; (2) the broker creates
partition t1-p1 in log dir1; (3) before the broker sends a response, it
goes down; (4) the broker is restarted with log dir1 unreadable; (5) the
broker receives a new LeaderAndIsrRequest and creates partition t1-p1 on
log dir2; (6) at some point, the broker is restarted with log dir1 fixed.
Now partition t1-p1 is in two log dirs. The alternative approach that I
suggested above may suffer from a similar corner case issue. Since this is
rare, if the broker detects this during broker startup, it can probably
just log an error and exit. The admin can remove the redundant partitions
manually and then restart the broker.

Thanks,

Jun

On Sat, Feb 18, 2017 at 9:31 PM, Dong Lin  wrote:

> Hey Jun,
>
> Could you please let me know if the solutions above could address your
> concern? I really want to move the discussion forward.
>
> Thanks,
> Dong
>
>
> On Tue, Feb 14, 2017 at 8:17 PM, Dong Lin  wrote:
>
> > Hey Jun,
> >
> > Thanks for all your help and time to discuss this KIP. When you get the
> > time, could you let me know if the previous answers address the concern?
> >
> > I think the more interesting question in your last email is where we
> > should store the "created" flag in ZK. I proposed the solution that I
> like
> > most, i.e. store it together with the replica assignment data in the
> /brokers/topics/[topic].
> > In order to expedite discussion, let me provide another two ideas to
> > address the concern just in case the first idea doesn't work:
> >
> > - We can avoid extra controller ZK read when there is no disk failure
> > (95% of time?). When controller starts, it doesn't
> > read controller_managed_state in ZK and sends LeaderAndIsrRequest with
> > "create = false". Only if LeaderAndIsrResponse shows failure for any
> > replica, then controller will read controller_managed_state for this
> > partition and re-send LeaderAndIsrRequset with "create=true" if this
> > replica has not been created.
> >
> > - We can significantly reduce this ZK read time by making
> > controller_managed_state a topic level information in ZK, e.g.
> > /brokers/topics/[topic]/state. Given that most topic has 10+ partition,

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Becket Qin
Hey Jay,

Yeah, I agree that enforcing the CPU time is a little tricky. I am thinking
that maybe we can use the existing request statistics. They are already
very detailed so we can probably see the approximate CPU time from it, e.g.
something like (total_time - request/response_queue_time - remote_time).

I agree with Guozhang that when a user is throttled it is likely that we
need to see if anything has went wrong first, and if the users are well
behaving and just need more resources, we will have to bump up the quota
for them. It is true that pre-allocating CPU time quota precisely for the
users is difficult. So in practice it would probably be more like first set
a relative high protective CPU time quota for everyone and increase that
for some individual clients on demand.

Thanks,

Jiangjie (Becket) Qin


On Mon, Feb 20, 2017 at 5:48 PM, Guozhang Wang  wrote:

> This is a great proposal, glad to see it happening.
>
> I am inclined to the CPU throttling, or more specifically processing time
> ratio instead of the request rate throttling as well. Becket has very well
> summed my rationales above, and one thing to add here is that the former
> has a good support for both "protecting against rogue clients" as well as
> "utilizing a cluster for multi-tenancy usage": when thinking about how to
> explain this to the end users, I find it actually more natural than the
> request rate since as mentioned above, different requests will have quite
> different "cost", and Kafka today already have various request types
> (produce, fetch, admin, metadata, etc), because of that the request rate
> throttling may not be as effective unless it is set very conservatively.
>
> Regarding to user reactions when they are throttled, I think it may differ
> case-by-case, and need to be discovered / guided by looking at relative
> metrics. So in other words users would not expect to get additional
> information by simply being told "hey, you are throttled", which is all
> what throttling does; they need to take a follow-up step and see "hmm, I'm
> throttled probably because of ..", which is by looking at other metric
> values: e.g. whether I'm bombarding the brokers with metadata request,
> which are usually cheap to handle but I'm sending thousands per second; or
> is it because I'm catching up and hence sending very heavy fetching request
> with large min.bytes, etc.
>
> Regarding to the implementation, as once discussed with Jun, this seems not
> very difficult since today we are already collecting the "thread pool
> utilization" metrics, which is a single percentage "aggregateIdleMeter"
> value; but we are already effectively aggregating it for each requests in
> KafkaRequestHandler, and we can just extend it by recording the source
> client id when handling them and aggregating by clientId as well as the
> total aggregate.
>
>
> Guozhang
>
>
>
>
> On Mon, Feb 20, 2017 at 4:27 PM, Jay Kreps  wrote:
>
> > Hey Becket/Rajini,
> >
> > When I thought about it more deeply I came around to the "percent of
> > processing time" metric too. It seems a lot closer to the thing we
> actually
> > care about and need to protect. I also think this would be a very useful
> > metric even in the absence of throttling just to debug whose using
> > capacity.
> >
> > Two problems to consider:
> >
> >1. I agree that for the user it is understandable what lead to their
> >being throttled, but it is a bit hard to figure out the safe range for
> >them. i.e. if I have a new app that will send 200 messages/sec I can
> >probably reason that I'll be under the throttling limit of 300
> req/sec.
> >However if I need to be under a 10% CPU resources limit it may be a
> bit
> >harder for me to know a priori if i will or won't.
> >2. Calculating the available CPU time is a bit difficult since there
> are
> >actually two thread pools--the I/O threads and the network threads. I
> > think
> >it might be workable to count just the I/O thread time as in the
> > proposal,
> >but the network thread work is actually non-trivial (e.g. all the disk
> >reads for fetches happen in that thread). If you count both the
> network
> > and
> >I/O threads it can skew things a bit. E.g. say you have 50 network
> > threads,
> >10 I/O threads, and 8 cores, what is the available cpu time available
> > in a
> >second? I suppose this is a problem whenever you have a bottleneck
> > between
> >I/O and network threads or if you end up significantly
> over-provisioning
> >one pool (both of which are hard to avoid).
> >
> > An alternative for CPU throttling would be to use this api:
> > http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/
> > management/ThreadMXBean.html#getThreadCpuTime(long)
> >
> > That would let you track actual CPU usage across the network, I/O
> threads,
> > and purgatory threads and look at it as a percentage of total cores. I
> > think this fixes many problems 

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Guozhang Wang
This is a great proposal, glad to see it happening.

I am inclined to the CPU throttling, or more specifically processing time
ratio instead of the request rate throttling as well. Becket has very well
summed my rationales above, and one thing to add here is that the former
has a good support for both "protecting against rogue clients" as well as
"utilizing a cluster for multi-tenancy usage": when thinking about how to
explain this to the end users, I find it actually more natural than the
request rate since as mentioned above, different requests will have quite
different "cost", and Kafka today already have various request types
(produce, fetch, admin, metadata, etc), because of that the request rate
throttling may not be as effective unless it is set very conservatively.

Regarding to user reactions when they are throttled, I think it may differ
case-by-case, and need to be discovered / guided by looking at relative
metrics. So in other words users would not expect to get additional
information by simply being told "hey, you are throttled", which is all
what throttling does; they need to take a follow-up step and see "hmm, I'm
throttled probably because of ..", which is by looking at other metric
values: e.g. whether I'm bombarding the brokers with metadata request,
which are usually cheap to handle but I'm sending thousands per second; or
is it because I'm catching up and hence sending very heavy fetching request
with large min.bytes, etc.

Regarding to the implementation, as once discussed with Jun, this seems not
very difficult since today we are already collecting the "thread pool
utilization" metrics, which is a single percentage "aggregateIdleMeter"
value; but we are already effectively aggregating it for each requests in
KafkaRequestHandler, and we can just extend it by recording the source
client id when handling them and aggregating by clientId as well as the
total aggregate.


Guozhang




On Mon, Feb 20, 2017 at 4:27 PM, Jay Kreps  wrote:

> Hey Becket/Rajini,
>
> When I thought about it more deeply I came around to the "percent of
> processing time" metric too. It seems a lot closer to the thing we actually
> care about and need to protect. I also think this would be a very useful
> metric even in the absence of throttling just to debug whose using
> capacity.
>
> Two problems to consider:
>
>1. I agree that for the user it is understandable what lead to their
>being throttled, but it is a bit hard to figure out the safe range for
>them. i.e. if I have a new app that will send 200 messages/sec I can
>probably reason that I'll be under the throttling limit of 300 req/sec.
>However if I need to be under a 10% CPU resources limit it may be a bit
>harder for me to know a priori if i will or won't.
>2. Calculating the available CPU time is a bit difficult since there are
>actually two thread pools--the I/O threads and the network threads. I
> think
>it might be workable to count just the I/O thread time as in the
> proposal,
>but the network thread work is actually non-trivial (e.g. all the disk
>reads for fetches happen in that thread). If you count both the network
> and
>I/O threads it can skew things a bit. E.g. say you have 50 network
> threads,
>10 I/O threads, and 8 cores, what is the available cpu time available
> in a
>second? I suppose this is a problem whenever you have a bottleneck
> between
>I/O and network threads or if you end up significantly over-provisioning
>one pool (both of which are hard to avoid).
>
> An alternative for CPU throttling would be to use this api:
> http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/
> management/ThreadMXBean.html#getThreadCpuTime(long)
>
> That would let you track actual CPU usage across the network, I/O threads,
> and purgatory threads and look at it as a percentage of total cores. I
> think this fixes many problems in the reliability of the metric. It's
> meaning is slightly different as it is just CPU (you don't get charged for
> time blocking on I/O) but that may be okay because we already have a
> throttle on I/O. The downside is I think it is possible this api can be
> disabled or isn't always available and it may also be expensive (also I've
> never used it so not sure if it really works the way i think).
>
> -Jay
>
> On Mon, Feb 20, 2017 at 3:17 PM, Becket Qin  wrote:
>
> > If the purpose of the KIP is only to protect the cluster from being
> > overwhelmed by crazy clients and is not intended to address resource
> > allocation problem among the clients, I am wondering if using request
> > handling time quota (CPU time quota) is a better option. Here are the
> > reasons:
> >
> > 1. request handling time quota has better protection. Say we have request
> > rate quota and set that to some value like 100 requests/sec, it is
> possible
> > that some of the requests are very expensive actually take a lot of time
> to
> > handle. In 

Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-20 Thread Matthias J. Sax
Hi,

thanks for updating the KIP. Couple of follow up comments:

* Nit: Why is "Reset to Earliest" and "Reset to Latest" a "reset by
time" option -- IMHO it belongs to "reset by position"?


* Nit: Description of "Reset to Earliest"

> using Kafka Consumer's `auto.offset.reset` to `earliest`

I think this is strictly speaking not correct (as auto.offset.reset only
triggered if no valid offset is found, but this tool explicitly modified
committed offset), and should be phrased as

> using Kafka Consumer's #seekToBeginning()

-> similar issue for description of "Reset to Latest"


* Main option: rename to --reset-offsets (plural instead of singular)


* Scenario Options: I would remove "reset" from all options, because the
main argument "--reset-offset" says already what to do:

> bin/kafka-consumer-groups.sh --reset-offset --reset-to-datetime XXX

better (IMHO):

> bin/kafka-consumer-groups.sh --reset-offsets --to-datetime XXX



* Option 1.e ("print and export current offset") is not intuitive to use
IMHO. The main option is "--reset-offset" but nothing happens if no
scenario is specified. It is also not specified, what the output should
look like?

Furthermore, --describe should actually show currently committed offset
for a group. So it seems to be redundant to have the same option in
--reset-offsets


* Option 2.a: I would rename to "--reset-to-offset" (or considering the
comment above to "--to-offset")


* Option 2.b and 2.c: I would unify to "--shift-offsets-by" (or similar)
and accept positive/negative values


* About Scope "all": maybe it's better to have an option "--all-topics"
(or similar). IMHO explicit arguments are preferable over implicit
setting to guard again accidental miss use of the tool.


* Scope: I also think, that "--topic" (singular) and "--topics" (plural)
are too similar and easy to use in a wrong way (ie, mix up) -- maybe we
can have two options that are easier to distinguish.


* I still think that JSON is not the best format (it's too verbose/hard
to write for humans from scratch). A simple CSV format with implicit
schema (topic,partition,offset) would be sufficient.


* Why does the JSON contain "group_id" field -- there is parameter
"--group" to specify the group ID. Would one overwrite the other (what
order) or would there be an error if "--group" is used in combination
with "--reset-from-file"?



-Matthias




On 2/17/17 6:43 AM, Jorge Esteban Quilcate Otoya wrote:
> Hi,
> 
> according to the feedback, I've updated the KIP:
> 
> - We have added and ordered the scenarios, scopes and executions of the
> Reset Offset tool.
> - Consider it as an extension to the current `ConsumerGroupCommand` tool
> - Execution will be possible without generating JSON files.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling
> 
> Looking forward to your feedback!
> 
> Jorge.
> 
> El mié., 8 feb. 2017 a las 23:23, Jorge Esteban Quilcate Otoya (<
> quilcate.jo...@gmail.com>) escribió:
> 
>> Great. I think I got the idea. What about this options:
>>
>> Scenarios:
>>
>> 1. Current status
>>
>> ´kafka-consumer-groups.sh --reset-offset --group cg1´
>>
>> 2. To Datetime
>>
>> ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-datetime
>> 2017-01-01T00:00:00.000´
>>
>> 3. To Period
>>
>> ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-period P2D´
>>
>> 4. To Earliest
>>
>> ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-earliest´
>>
>> 5. To Latest
>>
>> ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-latest´
>>
>> 6. Minus 'n' offsets
>>
>> ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-minus n´
>>
>> 7. Plus 'n' offsets
>>
>> ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-plus n´
>>
>> 8. To specific offset
>>
>> ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to x´
>>
>> Scopes:
>>
>> a. All topics used by Consumer Group
>>
>> Don't specify --topics
>>
>> b. Specific List of Topics
>>
>> Add list of values in --topics t1,t2,tn
>>
>> c. One Topic, all Partitions
>>
>> Add one topic and no partitions values: --topic t1
>>
>> d. One Topic, List of Partitions
>>
>> Add one topic and partitions values: --topic t1 --partitions 0,1,2
>>
>> About Reset Plan (JSON file):
>>
>> I think is still valid to have the option to persist reset configuration
>> as a file, but I agree to give the option to run the tool without going
>> down to the JSON file.
>>
>> Execution options:
>>
>> 1. Without execution argument (No args):
>>
>> Print out results (reset plan)
>>
>> 2. With --execute argument:
>>
>> Run reset process
>>
>> 3. With --output argument:
>>
>> Save result in a JSON format.
>>
>> 4. Only with --execute option and --reset-file (path to JSON)
>>
>> Reset based on file
>>
>> 4. Only with --verify option and --reset-file (path to JSON)
>>
>> Verify file values with current offsets
>>
>> I think we can remove --generate-and-execute because is a bit 

Re: [DISCUSS] KIP-125: ZookeeperConsumerConnector to KafkaConsumer Migration and Rollback

2017-02-20 Thread Onur Karaman
Regarding 1: We won't lose the offset from zookeeper upon partition
transfer from OZKCC/MDZKCC to MEZKCC because MEZKCC has
"dual.commit.enabled" set to true as well as "offsets.storage" set to
kafka. The combination of these configs results in the consumer fetching
offsets from both kafka and zookeeper and just picking the greater of the
two.

On Mon, Feb 20, 2017 at 4:33 PM, Dong Lin  wrote:

> Hey Onur,
>
> Thanks for the well-written KIP! I have two questions below.
>
> 1) In the process of migrating from OZKCCs and MDZKCCs to MEZKCCs, we will
> may a mix of OZKCCs, MDZKCCs and MEZKCCs. OZKCC and MDZKCC will only commit
> to zookeeper and MDZKCC will use kafka-based offset storage. Would we lose
> offset committed to zookeeper by a MDZKCC if a partition ownership if
> transferred from a MDZKCC to a MEZKCC?
>
> 2) Suppose every process in the group is running MEZKCC. Each MEZKCC has a
> zookeeper-based partition assignment and kafka-based partition assignment.
> Is it guaranteed that these two assignments are exactly the same across
> processes? If not, say the zookeeper-based assignment assigns p1, p2 to
> process 1, and p3 to process 2. And kafka-based assignment assigns p1, p3
> to process 1, and p2 to process 2. Say process 1 handles receives the
> notification to switch to kafka-based notification before process 2, it is
> possible that during a short period of time p3 will be consumed by both
> processes?
>
> This period is probably short and I am not sure how many messages may be
> duplicated as a result. But it seems possible to avoid this completely
> according to an idea that Becket suggested in a previous discussion. The
> znode /consumers//migration/mode can contain a sequence number
> that increment for each switch. Say the znode is toggled to kafka with
> sequence number 2, each MEZKCC will commit offset to with number 2 in the
> metadata for partitions that it currently owns according to the zk-based
> partition assignment, and then periodically fetches the committed offset
> and the metadata for the partitions that it should own according to the
> kafka-based partition assignment. Each MEZKCC only starts consumption when
> the metadata has incremented to the number 2.
>
> Thanks,
> Dong
>
>
>
>
>
>
>
>
> On Mon, Feb 20, 2017 at 12:04 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com
> > wrote:
>
> > Hey everyone.
> >
> > I made a KIP that provides a mechanism for migrating from
> > ZookeeperConsumerConnector to KafkaConsumer as well as a mechanism for
> > rolling back from KafkaConsumer to ZookeeperConsumerConnector:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-125%
> > 3A+ZookeeperConsumerConnector+to+KafkaConsumer+Migration+and+Rollback
> >
> > Comments are welcome.
> >
> > - Onur
> >
>


Re: [DISCUSS] KIP-125: ZookeeperConsumerConnector to KafkaConsumer Migration and Rollback

2017-02-20 Thread Dong Lin
Hey Onur,

Thanks for the well-written KIP! I have two questions below.

1) In the process of migrating from OZKCCs and MDZKCCs to MEZKCCs, we will
may a mix of OZKCCs, MDZKCCs and MEZKCCs. OZKCC and MDZKCC will only commit
to zookeeper and MDZKCC will use kafka-based offset storage. Would we lose
offset committed to zookeeper by a MDZKCC if a partition ownership if
transferred from a MDZKCC to a MEZKCC?

2) Suppose every process in the group is running MEZKCC. Each MEZKCC has a
zookeeper-based partition assignment and kafka-based partition assignment.
Is it guaranteed that these two assignments are exactly the same across
processes? If not, say the zookeeper-based assignment assigns p1, p2 to
process 1, and p3 to process 2. And kafka-based assignment assigns p1, p3
to process 1, and p2 to process 2. Say process 1 handles receives the
notification to switch to kafka-based notification before process 2, it is
possible that during a short period of time p3 will be consumed by both
processes?

This period is probably short and I am not sure how many messages may be
duplicated as a result. But it seems possible to avoid this completely
according to an idea that Becket suggested in a previous discussion. The
znode /consumers//migration/mode can contain a sequence number
that increment for each switch. Say the znode is toggled to kafka with
sequence number 2, each MEZKCC will commit offset to with number 2 in the
metadata for partitions that it currently owns according to the zk-based
partition assignment, and then periodically fetches the committed offset
and the metadata for the partitions that it should own according to the
kafka-based partition assignment. Each MEZKCC only starts consumption when
the metadata has incremented to the number 2.

Thanks,
Dong








On Mon, Feb 20, 2017 at 12:04 PM, Onur Karaman  wrote:

> Hey everyone.
>
> I made a KIP that provides a mechanism for migrating from
> ZookeeperConsumerConnector to KafkaConsumer as well as a mechanism for
> rolling back from KafkaConsumer to ZookeeperConsumerConnector:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-125%
> 3A+ZookeeperConsumerConnector+to+KafkaConsumer+Migration+and+Rollback
>
> Comments are welcome.
>
> - Onur
>


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Jay Kreps
Hey Becket/Rajini,

When I thought about it more deeply I came around to the "percent of
processing time" metric too. It seems a lot closer to the thing we actually
care about and need to protect. I also think this would be a very useful
metric even in the absence of throttling just to debug whose using capacity.

Two problems to consider:

   1. I agree that for the user it is understandable what lead to their
   being throttled, but it is a bit hard to figure out the safe range for
   them. i.e. if I have a new app that will send 200 messages/sec I can
   probably reason that I'll be under the throttling limit of 300 req/sec.
   However if I need to be under a 10% CPU resources limit it may be a bit
   harder for me to know a priori if i will or won't.
   2. Calculating the available CPU time is a bit difficult since there are
   actually two thread pools--the I/O threads and the network threads. I think
   it might be workable to count just the I/O thread time as in the proposal,
   but the network thread work is actually non-trivial (e.g. all the disk
   reads for fetches happen in that thread). If you count both the network and
   I/O threads it can skew things a bit. E.g. say you have 50 network threads,
   10 I/O threads, and 8 cores, what is the available cpu time available in a
   second? I suppose this is a problem whenever you have a bottleneck between
   I/O and network threads or if you end up significantly over-provisioning
   one pool (both of which are hard to avoid).

An alternative for CPU throttling would be to use this api:
http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/management/ThreadMXBean.html#getThreadCpuTime(long)

That would let you track actual CPU usage across the network, I/O threads,
and purgatory threads and look at it as a percentage of total cores. I
think this fixes many problems in the reliability of the metric. It's
meaning is slightly different as it is just CPU (you don't get charged for
time blocking on I/O) but that may be okay because we already have a
throttle on I/O. The downside is I think it is possible this api can be
disabled or isn't always available and it may also be expensive (also I've
never used it so not sure if it really works the way i think).

-Jay

On Mon, Feb 20, 2017 at 3:17 PM, Becket Qin  wrote:

> If the purpose of the KIP is only to protect the cluster from being
> overwhelmed by crazy clients and is not intended to address resource
> allocation problem among the clients, I am wondering if using request
> handling time quota (CPU time quota) is a better option. Here are the
> reasons:
>
> 1. request handling time quota has better protection. Say we have request
> rate quota and set that to some value like 100 requests/sec, it is possible
> that some of the requests are very expensive actually take a lot of time to
> handle. In that case a few clients may still occupy a lot of CPU time even
> the request rate is low. Arguably we can carefully set request rate quota
> for each request and client id combination, but it could still be tricky to
> get it right for everyone.
>
> If we use the request time handling quota, we can simply say no clients can
> take up to more than 30% of the total request handling capacity (measured
> by time), regardless of the difference among different requests or what is
> the client doing. In this case maybe we can quota all the requests if we
> want to.
>
> 2. The main benefit of using request rate limit is that it seems more
> intuitive. It is true that it is probably easier to explain to the user
> what does that mean. However, in practice it looks the impact of request
> rate quota is not more quantifiable than the request handling time quota.
> Unlike the byte rate quota, it is still difficult to give a number about
> impact of throughput or latency when a request rate quota is hit. So it is
> not better than the request handling time quota. In fact I feel it is
> clearer to tell user that "you are limited because you have taken 30% of
> the CPU time on the broker" than otherwise something like "your request
> rate quota on metadata request has reached".
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Mon, Feb 20, 2017 at 2:23 PM, Jay Kreps  wrote:
>
> > I think this proposal makes a lot of sense (especially now that it is
> > oriented around request rate) and fills the biggest remaining gap in the
> > multi-tenancy story.
> >
> > I think for intra-cluster communication (StopReplica, etc) we could avoid
> > throttling entirely. You can secure or otherwise lock-down the cluster
> > communication to avoid any unauthorized external party from trying to
> > initiate these requests. As a result we are as likely to cause problems
> as
> > solve them by throttling these, right?
> >
> > I'm not so sure that we should exempt the consumer requests such as
> > heartbeat. It's true that if we throttle an app's heartbeat requests it
> may
> > cause it to fall out of its 

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-20 Thread Becket Qin
Thanks for the explanation, Guozhang. That makes sense.

On Sun, Feb 19, 2017 at 7:28 PM, Guozhang Wang  wrote:

> Thanks Becket.
>
> Actually sequence is associated with a message, not a message set. For
> example if a message set sent by producer contains 100 messages, and the
> first message's sequence is 5, then the last message's sequence number
> would be 104, and the next message set's first sequence is expected to be
> 105.
>
>
> Guozhang
>
>
> On Sun, Feb 19, 2017 at 4:48 PM, Becket Qin  wrote:
>
> > +1. Thanks for the great work on the KIP!
> >
> > I have only one minor question, in the wiki (and the doc) the new message
> > set format has a "FirstSequence" field, should it just be "Sequence" if
> the
> > sequence is always associated with a message set?
> >
> > On Fri, Feb 17, 2017 at 3:28 AM, Michael Pearce 
> > wrote:
> >
> > > +0
> > >
> > > I think need some unified agreement on the VarInts.
> > >
> > > Would this also change in all other area’s of the protocol, e.g. value
> > and
> > > key length in message protocol, to keep this uniform across all
> protocols
> > > going forwards?
> > >
> > >
> > >
> > > On 17/02/2017, 00:23, "Apurva Mehta"  wrote:
> > >
> > > Hi Jun,
> > >
> > > Thanks for the reply. Comments inline.
> > >
> > > On Thu, Feb 16, 2017 at 2:29 PM, Jun Rao  wrote:
> > >
> > > > Hi, Apurva,
> > > >
> > > > Thanks for the reply. A couple of comment below.
> > > >
> > > > On Wed, Feb 15, 2017 at 9:45 PM, Apurva Mehta <
> apu...@confluent.io
> > >
> > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Answers inline:
> > > > >
> > > > > 210. Pid snapshots: Is the number of pid snapshot configurable
> or
> > > > hardcoded
> > > > > > with 2? When do we decide to roll a new snapshot? Based on
> > time,
> > > byte,
> > > > or
> > > > > > offset? Is that configurable too?
> > > > > >
> > > >
> > >
> > >
> > > > When a replica becomes a follower, we do a bit log truncation.
> > > Having an
> > > > older snapshot allows us to recover the PID->sequence mapping
> much
> > > quicker
> > > > than rescanning the whole log.
> > >
> > >
> > > This is a good point. I have updated the doc with a more detailed
> > > proposal.
> > > Essentially, snapshots will be created on a periodic basis. A
> > > reasonable
> > > period would be every 30 or 60 seconds. We will keep at most 2
> copies
> > > of
> > > the snapshot file. With this setup, we would have to replay at most
> > 60
> > > or
> > > 120 seconds of the log in the event of log truncation during leader
> > > failover.
> > >
> > > If we need to make any of this configurable, we can expose a config
> > in
> > > the
> > > future. It would be easier to add a config we need than remove one
> > with
> > > marginal utility.
> > >
> > >
> > > >
> > > > > >
> > > > > > 211. I am wondering if we should store ExpirationTime in the
> > > producer
> > > > > > transactionalId mapping message as we do in the producer
> > > transaction
> > > > > status
> > > > > > message. If a producer only calls initTransactions(), but
> never
> > > > publishes
> > > > > > any data, we still want to be able to expire and remove the
> > > producer
> > > > > > transactionalId mapping message.
> > > > > >
> > > > > >
> > > > > Actually, the document was inaccurate. The transactionalId will
> > be
> > > > expired
> > > > > only if there is no active transaction, and the age of the last
> > > > transaction
> > > > > with that transactionalId is older than the transactioanlId
> > > expiration
> > > > > time. With these semantics, storing the expiration time in the
> > > > > transactionalId mapping message won't be useful, since the
> > > expiration
> > > > time
> > > > > is a moving target based on transaction activity.
> > > > >
> > > > > I have updated the doc with a clarification.
> > > > >
> > > > >
> > > > >
> > > > Currently, the producer transactionalId mapping message doesn't
> > carry
> > > > ExpirationTime, but the producer transaction status message does.
> > > It would
> > > > be useful if they are consistent.
> > > >
> > > >
> > > You are right. The document has been updated to remove the
> > > ExpirationTime
> > > from the transaction status messages as well. Any utility for this
> > > field
> > > can be achieved by using the timestamp of the message itself along
> > with
> > > another expiration time (like transactionalId expiration time,
> > > transaction
> > > expiration time, etc.).
> > >
> > > Thanks,
> > > Apurva
> > >
> > >
> > > The information contained in this email is strictly confidential and
> for
> > > the use of the addressee only, unless otherwise indicated. If you are
> 

[jira] [Commented] (KAFKA-4782) change chroot for a kafka instance

2017-02-20 Thread Stephane Maarek (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15875133#comment-15875133
 ] 

Stephane Maarek commented on KAFKA-4782:


Use https://github.com/kshchepanovskyi/zkcopy

> change chroot for a kafka instance
> --
>
> Key: KAFKA-4782
> URL: https://issues.apache.org/jira/browse/KAFKA-4782
> Project: Kafka
>  Issue Type: Improvement
>Reporter: polarbear
>
> Hello
> I am a beginner of the kafka.
> Our kafka is "kafka_2.11-0.10.0.1.jar.asc", should be 0.10.0.1.
> chroot is default "/".
> Our user ask us to change the chroot from "/" to "/kafka".
> I change the parameter zookeeper.connect to add /kafka, but we cannot find  
> the old topic in "/kafka", so I think it should be a new instance.
> Which tool can we use to migrate the topic from the old "/" to new "/kafka"?
> Thank you



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Becket Qin
If the purpose of the KIP is only to protect the cluster from being
overwhelmed by crazy clients and is not intended to address resource
allocation problem among the clients, I am wondering if using request
handling time quota (CPU time quota) is a better option. Here are the
reasons:

1. request handling time quota has better protection. Say we have request
rate quota and set that to some value like 100 requests/sec, it is possible
that some of the requests are very expensive actually take a lot of time to
handle. In that case a few clients may still occupy a lot of CPU time even
the request rate is low. Arguably we can carefully set request rate quota
for each request and client id combination, but it could still be tricky to
get it right for everyone.

If we use the request time handling quota, we can simply say no clients can
take up to more than 30% of the total request handling capacity (measured
by time), regardless of the difference among different requests or what is
the client doing. In this case maybe we can quota all the requests if we
want to.

2. The main benefit of using request rate limit is that it seems more
intuitive. It is true that it is probably easier to explain to the user
what does that mean. However, in practice it looks the impact of request
rate quota is not more quantifiable than the request handling time quota.
Unlike the byte rate quota, it is still difficult to give a number about
impact of throughput or latency when a request rate quota is hit. So it is
not better than the request handling time quota. In fact I feel it is
clearer to tell user that "you are limited because you have taken 30% of
the CPU time on the broker" than otherwise something like "your request
rate quota on metadata request has reached".

Thanks,

Jiangjie (Becket) Qin


On Mon, Feb 20, 2017 at 2:23 PM, Jay Kreps  wrote:

> I think this proposal makes a lot of sense (especially now that it is
> oriented around request rate) and fills the biggest remaining gap in the
> multi-tenancy story.
>
> I think for intra-cluster communication (StopReplica, etc) we could avoid
> throttling entirely. You can secure or otherwise lock-down the cluster
> communication to avoid any unauthorized external party from trying to
> initiate these requests. As a result we are as likely to cause problems as
> solve them by throttling these, right?
>
> I'm not so sure that we should exempt the consumer requests such as
> heartbeat. It's true that if we throttle an app's heartbeat requests it may
> cause it to fall out of its consumer group. However if we don't throttle it
> it may DDOS the cluster if the heartbeat interval is set incorrectly or if
> some client in some language has a bug. I think the policy with this kind
> of throttling is to protect the cluster above any individual app, right? I
> think in general this should be okay since for most deployments this
> setting is meant as more of a safety valve---that is rather than set
> something very close to what you expect to need (say 2 req/sec or whatever)
> you would have something quite high (like 100 req/sec) with this meant to
> prevent a client gone crazy. I think when used this way allowing those to
> be throttled would actually provide meaningful protection.
>
> -Jay
>
>
>
> On Fri, Feb 17, 2017 at 9:05 AM, Rajini Sivaram 
> wrote:
>
> > Hi all,
> >
> > I have just created KIP-124 to introduce request rate quotas to Kafka:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 124+-+Request+rate+quotas
> >
> > The proposal is for a simple percentage request handling time quota that
> > can be allocated to **, ** or **. There
> > are a few other suggestions also under "Rejected alternatives". Feedback
> > and suggestions are welcome.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Jay Kreps
I think this proposal makes a lot of sense (especially now that it is
oriented around request rate) and fills the biggest remaining gap in the
multi-tenancy story.

I think for intra-cluster communication (StopReplica, etc) we could avoid
throttling entirely. You can secure or otherwise lock-down the cluster
communication to avoid any unauthorized external party from trying to
initiate these requests. As a result we are as likely to cause problems as
solve them by throttling these, right?

I'm not so sure that we should exempt the consumer requests such as
heartbeat. It's true that if we throttle an app's heartbeat requests it may
cause it to fall out of its consumer group. However if we don't throttle it
it may DDOS the cluster if the heartbeat interval is set incorrectly or if
some client in some language has a bug. I think the policy with this kind
of throttling is to protect the cluster above any individual app, right? I
think in general this should be okay since for most deployments this
setting is meant as more of a safety valve---that is rather than set
something very close to what you expect to need (say 2 req/sec or whatever)
you would have something quite high (like 100 req/sec) with this meant to
prevent a client gone crazy. I think when used this way allowing those to
be throttled would actually provide meaningful protection.

-Jay



On Fri, Feb 17, 2017 at 9:05 AM, Rajini Sivaram 
wrote:

> Hi all,
>
> I have just created KIP-124 to introduce request rate quotas to Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 124+-+Request+rate+quotas
>
> The proposal is for a simple percentage request handling time quota that
> can be allocated to **, ** or **. There
> are a few other suggestions also under "Rejected alternatives". Feedback
> and suggestions are welcome.
>
> Thank you...
>
> Regards,
>
> Rajini
>


[DISCUSS] KIP-125: ZookeeperConsumerConnector to KafkaConsumer Migration and Rollback

2017-02-20 Thread Onur Karaman
Hey everyone.

I made a KIP that provides a mechanism for migrating from
ZookeeperConsumerConnector to KafkaConsumer as well as a mechanism for
rolling back from KafkaConsumer to ZookeeperConsumerConnector:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-125%3A+ZookeeperConsumerConnector+to+KafkaConsumer+Migration+and+Rollback

Comments are welcome.

- Onur


[jira] [Commented] (KAFKA-2045) Memory Management on the consumer

2017-02-20 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874896#comment-15874896
 ] 

Armin Braun commented on KAFKA-2045:


add a suggestion on how to concretely tackly this and KAFKA-1895 here -> 
https://issues.apache.org/jira/browse/KAFKA-1895?focusedCommentId=15874894=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15874894

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-2045) Memory Management on the consumer

2017-02-20 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874896#comment-15874896
 ] 

Armin Braun edited comment on KAFKA-2045 at 2/20/17 6:10 PM:
-

added a suggestion on how to concretely tackly this and KAFKA-1895 here -> 
https://issues.apache.org/jira/browse/KAFKA-1895?focusedCommentId=15874894=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15874894


was (Author: original-brownbear):
add a suggestion on how to concretely tackly this and KAFKA-1895 here -> 
https://issues.apache.org/jira/browse/KAFKA-1895?focusedCommentId=15874894=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15874894

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Guozhang Wang
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-1895) Investigate moving deserialization and decompression out of KafkaConsumer

2017-02-20 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874894#comment-15874894
 ] 

Armin Braun commented on KAFKA-1895:


[~jkreps] Maybe let me give the perspective of a user, using Kafka in a 
situation where high throughput and a low GC footprint are key and this change 
would be extremely valuable.

What I would ideally like to see here and in KAFKA-2045 is this:

Create a lower level interface along the lines of Hadoop's 
[RawKeyValueIterator](http://atetric.com/atetric/javadoc/org.apache.hadoop/hadoop-mapreduce-client-core/2.6.2/src-html/org/apache/hadoop/mapred/RawKeyValueIterator.html)
 that would back the current KafkaConsumer interface:

{code}public interface RawRecordIterator {
/**
 * Gets the current raw key.
 *
 * @return Gets the current raw key
 * @throws IOException
 */
ByteBuffer getKey() throws IOException;

/**
 * Gets the current raw value.
 *
 * @return Gets the current raw value
 * @throws IOException
 */
ByteBuffer getValue() throws IOException;

/**
 * Sets up the current key and value (for getKey and getValue).
 *
 * @return true if there exists a key/value,
 * false otherwise.
 * false implies the need for making the next call to
 * poll()
 * @throws IOException
 */
boolean next() throws IOException;

/**
 * Polls Kafka for more Records.
 *
 * @throws IOException
 */
void poll() throws IOException;

/**
 * Closes the iterator so that the underlying memory can be released.
 *
 * @throws IOException
 */
void close() throws IOException;
}
{code}

I think this one is pretty idea if implemented properly:

* If you start from some initial size for two backing `ByteBuffer`s and grow 
them as needed, if a poll doesn't fit into the existing ones you can have a 
nice and fast zero copy iterator (simply store the offsets and lengths for each 
record in `int` arrays, that you could also set up in some 0gc way easily and 
move position and limit on the buffers on every call to `next`).

This interface can easily be used to generate the current 
`org.apache.kafka.clients.consumer.ConsumerRecords` in an efficient way (but 
would also if made public) allow users to implement more efficient 
serialization/deserialization approaches on top. Examples that would massively 
profit from this approach would be Avro or Hadoop's Writables, that could keep 
reusing the same key and value objects without the need to constantly run 
expensive `byte[]` allocations.

Having this as the basis of the current implementation and publicly exposed 
would deal with this concern you voiced:

{code}
You could potentially implement both but you would need to change consumer.poll 
to allow passing back in the ConsumerRecords instance for reuse when you are 
done with it.
{code}

-> no need for this then in my opinion. Users can just come up with their own 
way to handle memory. Passing back records that aren't needed anymore seems 
like less than practical to implement. Much easier if this is just done by 
direct reuse of a single object like in Hadoop, Avro or Spark.

Also 

{code}
2. We don't mangle the code to badly in doing so
{code}

wouldn't be much of a concern either imo since you can keep the old interface 
around on top. When implementing this I'd simply try to get this interface in 
at the lowest possible level and keep the existing codebase on top. This would 
probably still allow some optimization in the existing code on top + it would 
give uses the ability to write much faster consumers.
Again kind of like Avor has it with giving you the option to reuse and object 
when deserializing, but still keeping the slower API that gives you a new one 
one every `next` call around on top of things 
(https://avro.apache.org/docs/current/gettingstartedjava.html#Deserializing is 
what I mean).

PS: If this is seen as a viable approach I'd be happy to give it a go. This 
should be doable for me in a "not overly invasive to the existing codebase" way 
I think.


> Investigate moving deserialization and decompression out of KafkaConsumer
> -
>
> Key: KAFKA-1895
> URL: https://issues.apache.org/jira/browse/KAFKA-1895
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Jay Kreps
>
> The consumer implementation in KAFKA-1760 decompresses fetch responses and 
> deserializes them into ConsumerRecords which are then handed back as the 
> result of poll().
> There are several downsides to this:
> 1. It is impossible to scale serialization and decompression work beyond the 
> single thread running the KafkaConsumer.
> 2. The results can come back during the processing of other calls such 

[jira] [Commented] (KAFKA-2096) Enable keepalive socket option for broker to prevent socket leak

2017-02-20 Thread Faisal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874848#comment-15874848
 ] 

Faisal commented on KAFKA-2096:
---

Does this solution also resolve following error in spark streaming direct mode 
connecting to Kafka?
*Too many open files, java.net.SocketException*
After running 5-10 days with 10 seconds interval , my spark streaming get this 
error on driver node that i only see in driver log file.
Kafka version: 0.8.2.0
Spark streaming: 1.5.0-cdh5.5.6


> Enable keepalive socket option for broker to prevent socket leak
> 
>
> Key: KAFKA-2096
> URL: https://issues.apache.org/jira/browse/KAFKA-2096
> Project: Kafka
>  Issue Type: Improvement
>  Components: network
>Affects Versions: 0.8.2.1
>Reporter: Allen Wang
>Assignee: Allen Wang
>Priority: Critical
> Fix For: 0.9.0.0
>
> Attachments: patch.diff
>
>
> We run a Kafka 0.8.2.1 cluster in AWS with large number of producers (> 
> 1). Also the number of producer instances scale up and down significantly 
> on a daily basis.
> The issue we found is that after 10 days, the open file descriptor count will 
> approach the limit of 32K. An investigation of these open file descriptors 
> shows that a significant portion of these are from client instances that are 
> terminated during scaling down. Somehow they still show as "ESTABLISHED" in 
> netstat. We suspect that the AWS firewall between the client and broker 
> causes this issue.
> We attempted to use "keepalive" socket option to reduce this socket leak on 
> broker and it appears to be working. Specifically, we added this line to 
> kafka.network.Acceptor.accept():
>   socketChannel.socket().setKeepAlive(true)
> It is confirmed during our experiment of this change that entries in netstat 
> where the client instance is terminated were probed as configured in 
> operating system. After configured number of probes, the OS determined that 
> the peer is no longer alive and the entry is removed, possibly after an error 
> in Kafka to read from the channel and closing the channel. Also, our 
> experiment shows that after a few days, the instance was able to keep a 
> stable low point of open file descriptor count, compared with other instances 
> where the low point keeps increasing day to day.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Kafka exception

2017-02-20 Thread Yuanjia
Hi Liwu,
Correct me if I am wrong.
When calling the method ConsumerConnector.shutdown(), it will send  
"ZookeeperConsumerConnector.shutdownCommand" to the queue, not set 
ConsumerIterator's state is NOT_READY directly. So the consumer will continue 
consuming until get the shutdownCommand in the queue. 
Is there any exception information when calling the method 
ConsumerIterator.makeNext()?




Yuanjia Li
 
From: 揣立武
Date: 2017-02-16 14:29
To: dev
CC: users; 陈希; 涂扬
Subject: Kafka exception
 
Hi,all! Our program uses the high level consumer api(the version is 0.8.x). 
Sometimes the program will throw an exception in the 42th row in 
kafka.utils.IteratorTemplate class,the content is "throw new 
IllegalStateException("Expected item but none found.")". 
 
I think it is a race condition problem between the close thread and the consume 
thread. When the close thread calling the method ConsumerConnector.shutdown(), 
it will set ConsumerIterator's state is NOT_READY. But at the same time, the 
consume thread calls the method ConsumerIterator.hasNext() and goes to the 67th 
row in  kafka.utils.IteratorTemplate class,the content is "if(state == DONE) 
{", the if will be false that means has a item. And when calling the 
ConsumerIterator.next(), it will throw that exception.
 
Have you ever had this problem? Please tell me how to deal with it, thanks!
 
 
 


[jira] [Work started] (KAFKA-4703) Test with two SASL_SSL listeners with different JAAS contexts

2017-02-20 Thread Balint Molnar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4703 started by Balint Molnar.

> Test with two SASL_SSL listeners with different JAAS contexts
> -
>
> Key: KAFKA-4703
> URL: https://issues.apache.org/jira/browse/KAFKA-4703
> Project: Kafka
>  Issue Type: Test
>Reporter: Ismael Juma
>Assignee: Balint Molnar
>  Labels: newbie
>
> [~rsivaram] suggested the following in 
> https://github.com/apache/kafka/pull/2406:
> {quote}
> I think this feature allows two SASL_SSL listeners, one for external and one 
> for internal and the two can use different mechanisms and different JAAS 
> contexts. That makes the multi-mechanism configuration neater. I think it 
> will be useful to have an integration test for this, perhaps change 
> SaslMultiMechanismConsumerTest.
> {quote}
> And my reply:
> {quote}
> I think it's a bit tricky to support multiple listeners in 
> KafkaServerTestHarness. Maybe it's easier to do the test you suggest in 
> MultipleListenersWithSameSecurityProtocolTest.
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4782) change chroot for a kafka instance

2017-02-20 Thread Jayesh Thakrar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874650#comment-15874650
 ] 

Jayesh Thakrar commented on KAFKA-4782:
---

A good question and not an uncommon situation.
However I don't know if there is a generic tool/api to "move" a znode or set of 
znodes around.
I think you may have to write a custom program to do the task.
If you do, it could be a good contribution back to the Zookeeper community that 
will be useful to many beyond just the Kafka project.

> change chroot for a kafka instance
> --
>
> Key: KAFKA-4782
> URL: https://issues.apache.org/jira/browse/KAFKA-4782
> Project: Kafka
>  Issue Type: Improvement
>Reporter: polarbear
>
> Hello
> I am a beginner of the kafka.
> Our kafka is "kafka_2.11-0.10.0.1.jar.asc", should be 0.10.0.1.
> chroot is default "/".
> Our user ask us to change the chroot from "/" to "/kafka".
> I change the parameter zookeeper.connect to add /kafka, but we cannot find  
> the old topic in "/kafka", so I think it should be a new instance.
> Which tool can we use to migrate the topic from the old "/" to new "/kafka"?
> Thank you



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2577: MINOR: add 2 failure test methods

2017-02-20 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/2577

MINOR: add 2 failure test methods



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka minor-benchmark-args

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2577.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2577


commit 1831792880425893a9c6b4de17a66776422c819b
Author: Eno Thereska 
Date:   2017-01-26T12:21:12Z

Checkpoint

commit 134df74f469aa2db65ea7941db45a7ef61b8248d
Author: Eno Thereska 
Date:   2017-01-27T10:40:32Z

Checkpoint, runs on local

commit 0494d5bc47aac9e091142e2224d9000dbbd3e51e
Author: Ubuntu 
Date:   2017-01-29T20:02:50Z

Adjusted import

commit f02ff62f672a5f68cdb71f1c29f9687039db13be
Author: Eno Thereska 
Date:   2017-01-29T20:36:41Z

Revert log4j file

commit 84c716d233f161d3e5aa344e718ad3cbf70b99bf
Author: Eno Thereska 
Date:   2017-01-30T09:52:20Z

Fixed ending logic

commit 51552d97d389b7f01b882ced775b286b8c3f02d1
Author: Eno Thereska 
Date:   2017-01-30T17:33:58Z

Default for nightly

commit 191b1eccdded07703a92ff33b7039670081dcfa8
Author: Ubuntu 
Date:   2017-01-30T17:36:13Z

3 tests

commit 1d3ab26243893f0b2ae9a193237a8c1c36a22eec
Author: Eno Thereska 
Date:   2017-01-30T17:45:13Z

Merge remote-tracking branch 'origin/trunk' into minor-benchmark-args

commit ee6a6a48f00a200939a2dd6d3bec6f6bfafb3f29
Author: Eno Thereska 
Date:   2017-01-30T19:41:24Z

Merge with trunk

commit b889b80eaa9c864d713214f02e6ecd7e8ff8e432
Author: Eno Thereska 
Date:   2017-01-30T19:42:30Z

Merged with branch

commit 1d9e5bd637831a7c1307eeddb9edf71b3f94d5fe
Author: Eno Thereska 
Date:   2017-01-31T15:58:05Z

Reduce number of latch countdowns

commit 88759344338659c27c88d7c7aa10cc3cb917c69a
Author: Eno Thereska 
Date:   2017-02-01T12:45:55Z

Adjusted num nodes for nightly

commit b1e36283a8a987363bf93a1be7a78e806c6926a4
Author: Eno Thereska 
Date:   2017-02-02T09:40:12Z

Merge remote-tracking branch 'origin/trunk' into minor-benchmark-args

commit d9d0de515cd660cf27dba04494583ec924259743
Author: Eno Thereska 
Date:   2017-02-03T12:13:22Z

Added count test, default is now 2 runs nightly

commit 9484d61717ba6ac6d0a41fe99eba6a9c30fd13b6
Author: Eno Thereska 
Date:   2017-02-03T14:35:29Z

Add count topic

commit 9c028411828338eac8069374c843ac3c2d955904
Author: Eno Thereska 
Date:   2017-02-08T11:01:24Z

Refactor and clean up code, address Guozhang's comments

commit abe24a03556ab6905180f7f738334cd857863b6c
Author: Eno Thereska 
Date:   2017-02-08T11:03:00Z

Removed debugging code

commit 8b0a7e4dad67e1142269b58dae5444aa907f1857
Author: Eno Thereska 
Date:   2017-02-08T14:34:14Z

Get the results for each individual run instead of adding them up

commit 7d1968cce975d07c760b2248c6578e5841f5cece
Author: Eno Thereska 
Date:   2017-02-08T15:03:33Z

Run at scale 1 only until KAFKA-4716 is fixed

commit 4e63ce1f7e6dd2fe8cc02cc362b277b660ae8287
Author: Eno Thereska 
Date:   2017-02-08T15:57:19Z

Merge remote-tracking branch 'origin/trunk' into minor-benchmark-args

commit 38281a2bcc91e30ba141f0b7e32ece0f03491293
Author: Eno Thereska 
Date:   2017-02-08T19:01:48Z

Added comments on class

commit c40309f9a8f3997ab2efdc18786f1da6901601d0
Author: Eno Thereska 
Date:   2017-02-16T14:55:19Z

Checkpoint

commit 464c397ed87b5c1e4f0f55bf49dfaeb674c16ef9
Author: Eno Thereska 
Date:   2017-02-16T15:04:21Z

Merged

commit 8c7740cea906ee1c0ffa5159bc6ae78606e7a640
Author: Eno Thereska 
Date:   2017-02-16T15:14:05Z

Merged with trunk

commit 88b521c4225392c7311224ae5cc9b77bfaf2a21c
Author: Eno Thereska 
Date:   2017-02-17T16:57:55Z

Checkpoint

commit 634d1b389c69fdbe13c3a15c400b44bfd139f050
Author: Eno Thereska 
Date:   2017-02-17T18:35:23Z

Restore stats

commit 38e38c4cd9305544814d4854dbc45dab4039c397
Author: Eno Thereska 
Date:   2017-02-20T13:11:32Z

Ignore failure tests for the nightly

commit f8d44247d699211c2c6ad7d0fbfac4a748dc6b0a
Author: Eno Thereska 
Date:   2017-02-20T13:16:38Z

Some 

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Rajini Sivaram
I have updated the KIP to use request rates instead of request processing
time,

I have removed all requests that require ClusterAction permission
(LeaderAndIsr and UpdateMetdata as well in addition to stop/shutdown). But
I have left Metadata request in. Quota windows which limit the maximum
delay tend to be small (1 second by default) compared to request timeout or
max.block.ms and even the existing byte rate quotas can impact the time
taken to fetch metadata if the metadata request is queued behind a produce
request (for instance). So I don't think clients will need any additional
exception handling code for request rate quotas beyond what they already
need for byte rate quotas. Clients can flood the broker with metadata
requests (eg. producer with retry.backoff.ms=0 sending a message to a
non-existent topic), so it makes sense to throttle metadata requests.


Thanks,

Rajini

On Mon, Feb 20, 2017 at 11:55 AM, Dong Lin  wrote:

> Hey Rajini,
>
> Thanks for the explanation. I have some follow up questions regarding the
> types of requests that will be covered by this quota. Since this KIP focus
> only on throttling the traffic between client and broker and client never
> sends LeaderAndIsrRequest to broker, should we exclude LeaderAndIsrRequest
> from this KIP?
>
> Besides, I am still not sure we should throttle MetadataUpdateRequeset. The
> benefits of throttling MetadataUpdateRequest seems little since it doesn't
> increase with user traffic. Client only sends MetadataUpdateRequest when
> there is partition leadership change or when client metadata has expired.
> On the other hand, if we throttle MetadataUpdateRequest, there is chance
> that MetadataUpdateRequest doesn't get update in time and user may receive
> exception. This seems like a big interface change because user will have to
> change application code to handle such exception. Note that the current
> rate-based quota will reduce traffic without throwing any exception to
> user.
>
> Anyway, I am looking forward to the updated KIP:)
>
> Thanks,
> Dong
>
> On Mon, Feb 20, 2017 at 2:43 AM, Rajini Sivaram 
> wrote:
>
> > Dong, Onur & Becket,
> >
> > Thank you all for the very useful feedback.
> >
> > The choice of request handling time as opposed to request rate was based
> on
> > the observation in KAFKA-4195
> >  that request rates
> may
> > be less intuitive to configure than percentage utilization. But since the
> > KIP is measuring time rather than request pool utilization as suggested
> in
> > the JIRA, I agree that request rate would probably work better than
> > percentage. So I am inclined to change the KIP to throttle on request
> rates
> > (e.g 100 requests per second) rather than percentage. Average request
> rates
> > are exposed as metrics, so admin can configure quotas based on that. And
> > the values are more meaningful from the client application point of
> view. I
> > am still interested in feedback regarding the second rejected alternative
> > that throttles based on percentage utilization of resource handler pool.
> > That was the suggestion from Jun/Ismael in KAFKA-4195, but I couldn't see
> > how that would help in the case where a small number of connections
> pushed
> > a continuous stream of short requests. Suggestions welcome.
> >
> > Responses to other questions above:
> >
> > - (Dong): The KIP proposes to throttle most requests (and not just
> > Produce/Fetch) since the goal is to control usage of broker resources. So
> > LeaderAndIsrRequest and MetadataRequest will also be throttled. The few
> > requests not being throttled are timing-sensitive.
> >
> > - (Dong): The KIP does not propose to throttle inter-broker traffic based
> > on request rates. The most frequent requests in inter-broker traffic are
> > fetch requests and a well configured broker would use reasonably good
> > values of min.bytes and max.wait that avoids overloading the broker
> > unnecessarily with fetch requests. The existing byte-rate based quotas
> > should be sufficient in this case.
> >
> > - (Onur): Quota window configuration - this is the existing configuration
> > quota.window.size.seconds (also used for byte-rate quotas)
> >
> > - (Becket): The main issue that the KIP is addressing is clients flooding
> > the broker with small requests (eg. fetch with max.wait.ms=0), which can
> > overload the broker and delay requests from other clients/users even
> though
> > the byte rate is quite small. CPU quota reflects the resource usage on
> the
> > broker that the KIP is attempting to limit. Since this is the time on the
> > local broker, it shouldn't vary much depending on acks=-1 etc. but I do
> > agree on the unpredictability of time based quotas. Switching from
> request
> > processing time to request rates will address this. Would you still be
> > concerned that "*Users do not have direct control over the request rate,
> > i.e. users do **not know 

[jira] [Commented] (KAFKA-4686) Null Message payload is shutting down broker

2017-02-20 Thread Hannu Valtonen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874555#comment-15874555
 ] 

Hannu Valtonen commented on KAFKA-4686:
---

Oh yeah, thought to mention that just before the payload msg thing occurs, 
there were a few SSL disconnect messages in the Kafka server log. (It would be 
great BTW if it gave some info on which client/from which IP disconnected)

WARN Failed to send SSL Close message  
(org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690)
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47)
at org.apache.kafka.common.network.Selector.close(Selector.java:487)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368)
at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
at kafka.network.Processor.poll(SocketServer.scala:476)
at kafka.network.Processor.run(SocketServer.scala:416)
at java.lang.Thread.run(Thread.java:745)

> Null Message payload is shutting down broker
> 
>
> Key: KAFKA-4686
> URL: https://issues.apache.org/jira/browse/KAFKA-4686
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.1
> Environment: Amazon Linux AMI release 2016.03 kernel 
> 4.4.19-29.55.amzn1.x86_64
>Reporter: Rodrigo Queiroz Saramago
> Fix For: 0.10.3.0
>
> Attachments: KAFKA-4686-NullMessagePayloadError.tar.xz, 
> kafkaServer.out
>
>
> Hello, I have a test environment with 3 brokers and 1 zookeeper nodes, in 
> which clients connect using two-way ssl authentication. I use kafka version 
> 0.10.1.1, the system works as expected for a while, but if the broker goes 
> down and then is restarted, something got corrupted and is not possible start 
> broker again, it always fails with the same error. What this error mean? What 
> can I do in this case? Is this the expected behavior?
> [2017-01-23 07:03:28,927] ERROR There was an error in one of the threads 
> during logs loading: kafka.common.KafkaException: Message payload is null: 
> Message(magic = 0, attributes = 1, crc = 4122289508, key = null, payload = 
> null) (kafka.log.LogManager)
> [2017-01-23 07:03:28,929] FATAL Fatal error during KafkaServer startup. 
> Prepare to shutdown (kafka.server.KafkaServer)
> kafka.common.KafkaException: Message payload is null: Message(magic = 0, 
> attributes = 1, crc = 4122289508, key = null, payload = null)
> at 
> kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
> at 
> kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
> at kafka.message.MessageAndOffset.firstOffset(MessageAndOffset.scala:33)
> at kafka.log.LogSegment.recover(LogSegment.scala:223)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:218)
> at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:179)
> at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
> at kafka.log.Log.loadSegments(Log.scala:179)
> at kafka.log.Log.(Log.scala:108)
> at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:151)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:58)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> [2017-01-23 07:03:28,946] INFO shutting down (kafka.server.KafkaServer)
> [2017-01-23 07:03:28,949] INFO Terminate ZkClient event thread. 
> (org.I0Itec.zkclient.ZkEventThread)
> [2017-01-23 07:03:28,954] INFO EventThread shut down for session: 
> 0x159bd458ae70008 (org.apache.zookeeper.ClientCnxn)
> [2017-01-23 07:03:28,954] INFO Session: 0x159bd458ae70008 closed 
> (org.apache.zookeeper.ZooKeeper)
> 

[jira] [Commented] (KAFKA-4686) Null Message payload is shutting down broker

2017-02-20 Thread Hannu Valtonen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15874455#comment-15874455
 ] 

Hannu Valtonen commented on KAFKA-4686:
---

Hit this again in a test enviroment of ours: 
Kafka server log snippet: (this repeats ad infinitum)

[2017-02-20 12:19:16,931] ERROR [KafkaApi-0] Error when handling request Name: 
FetchRequest; Version: 0; CorrelationId: 4; ClientId: kafka-python; ReplicaId: 
-1; MaxWait:  ms; MinBytes: 1 bytes; MaxBytes:2147483647 bytes; 
RequestInfo: 
([6c999dfb-231c-4e8d-9c5b-19010c550a87-msgs,0],PartitionFetchInfo(0,16777216)) 
(kafka.server.KafkaApis)
kafka.common.KafkaException: Message payload is null: Message(magic = 0, 
attributes = 2, crc = 1331266146, key = null, payload = null)
at 
kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
at 
kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
at 
kafka.log.FileMessageSet$$anonfun$toMessageFormat$1.apply(FileMessageSet.scala:276)
at 
kafka.log.FileMessageSet$$anonfun$toMessageFormat$1.apply(FileMessageSet.scala:269)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at kafka.message.MessageSet.foreach(MessageSet.scala:71)
at kafka.log.FileMessageSet.toMessageFormat(FileMessageSet.scala:269)
at kafka.server.KafkaApis$$anonfun$29.apply(KafkaApis.scala:490)
at kafka.server.KafkaApis$$anonfun$29.apply(KafkaApis.scala:477)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$sendResponseCallback$3(KafkaApis.scala:477)
at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:542)
at 
kafka.server.KafkaApis$$anonfun$handleFetchRequest$1.apply(KafkaApis.scala:542)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:497)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:534)
at kafka.server.KafkaApis.handle(KafkaApis.scala:79)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)

Then the dumping you requested of the topic: 

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 
/srv/kafka/6c999dfb-231c-4e8d-9c5b-19010c550a87-msgs-0/.log 
--deep-iteration
Dumping 
/srv/kafka/6c999dfb-231c-4e8d-9c5b-19010c550a87-msgs-0/.log
Starting offset: 0
offset: 0 position: 0 CreateTime: -1 isvalid: true payloadsize: 9438 magic: 1 
compresscodec: NoCompressionCodec crc: 3724646734
offset: 1 position: 7227 CreateTime: -1 isvalid: true payloadsize: 9571 magic: 
1 compresscodec: NoCompressionCodec crc: 942316008
offset: 2 position: 14504 CreateTime: -1 isvalid: true payloadsize: 9571 magic: 
1 compresscodec: NoCompressionCodec crc: 2440026867
offset: 3 position: 21781 CreateTime: -1 isvalid: true payloadsize: 9571 magic: 
1 compresscodec: NoCompressionCodec crc: 2902450727
Exception in thread "main" kafka.common.KafkaException: Message payload is 
null: Message(magic = 0, attributes = 2, crc = 1331266146, key = null, payload 
= null)
at 
kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:90)
at 
kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
at 
kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$getIterator(DumpLogSegments.scala:352)
at 
kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpLog$1.apply(DumpLogSegments.scala:311)
at 
kafka.tools.DumpLogSegments$$anonfun$kafka$tools$DumpLogSegments$$dumpLog$1.apply(DumpLogSegments.scala:310)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at 
kafka.tools.DumpLogSegments$.kafka$tools$DumpLogSegments$$dumpLog(DumpLogSegments.scala:310)
at 
kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:96)
at 
kafka.tools.DumpLogSegments$$anonfun$main$1.apply(DumpLogSegments.scala:92)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at kafka.tools.DumpLogSegments$.main(DumpLogSegments.scala:92)
at kafka.tools.DumpLogSegments.main(DumpLogSegments.scala)

kafka-topics.sh --describe for the topic:


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Dong Lin
Hey Rajini,

Thanks for the explanation. I have some follow up questions regarding the
types of requests that will be covered by this quota. Since this KIP focus
only on throttling the traffic between client and broker and client never
sends LeaderAndIsrRequest to broker, should we exclude LeaderAndIsrRequest
from this KIP?

Besides, I am still not sure we should throttle MetadataUpdateRequeset. The
benefits of throttling MetadataUpdateRequest seems little since it doesn't
increase with user traffic. Client only sends MetadataUpdateRequest when
there is partition leadership change or when client metadata has expired.
On the other hand, if we throttle MetadataUpdateRequest, there is chance
that MetadataUpdateRequest doesn't get update in time and user may receive
exception. This seems like a big interface change because user will have to
change application code to handle such exception. Note that the current
rate-based quota will reduce traffic without throwing any exception to user.

Anyway, I am looking forward to the updated KIP:)

Thanks,
Dong

On Mon, Feb 20, 2017 at 2:43 AM, Rajini Sivaram 
wrote:

> Dong, Onur & Becket,
>
> Thank you all for the very useful feedback.
>
> The choice of request handling time as opposed to request rate was based on
> the observation in KAFKA-4195
>  that request rates may
> be less intuitive to configure than percentage utilization. But since the
> KIP is measuring time rather than request pool utilization as suggested in
> the JIRA, I agree that request rate would probably work better than
> percentage. So I am inclined to change the KIP to throttle on request rates
> (e.g 100 requests per second) rather than percentage. Average request rates
> are exposed as metrics, so admin can configure quotas based on that. And
> the values are more meaningful from the client application point of view. I
> am still interested in feedback regarding the second rejected alternative
> that throttles based on percentage utilization of resource handler pool.
> That was the suggestion from Jun/Ismael in KAFKA-4195, but I couldn't see
> how that would help in the case where a small number of connections pushed
> a continuous stream of short requests. Suggestions welcome.
>
> Responses to other questions above:
>
> - (Dong): The KIP proposes to throttle most requests (and not just
> Produce/Fetch) since the goal is to control usage of broker resources. So
> LeaderAndIsrRequest and MetadataRequest will also be throttled. The few
> requests not being throttled are timing-sensitive.
>
> - (Dong): The KIP does not propose to throttle inter-broker traffic based
> on request rates. The most frequent requests in inter-broker traffic are
> fetch requests and a well configured broker would use reasonably good
> values of min.bytes and max.wait that avoids overloading the broker
> unnecessarily with fetch requests. The existing byte-rate based quotas
> should be sufficient in this case.
>
> - (Onur): Quota window configuration - this is the existing configuration
> quota.window.size.seconds (also used for byte-rate quotas)
>
> - (Becket): The main issue that the KIP is addressing is clients flooding
> the broker with small requests (eg. fetch with max.wait.ms=0), which can
> overload the broker and delay requests from other clients/users even though
> the byte rate is quite small. CPU quota reflects the resource usage on the
> broker that the KIP is attempting to limit. Since this is the time on the
> local broker, it shouldn't vary much depending on acks=-1 etc. but I do
> agree on the unpredictability of time based quotas. Switching from request
> processing time to request rates will address this. Would you still be
> concerned that "*Users do not have direct control over the request rate,
> i.e. users do **not know when a request will be sent by the clients*"?
>
> Jun/Ismael,
>
> I am interested in your views on request rate based quotas and whether we
> should still consider utilization of the resource handler pool.
>
>
> Many thanks,
>
> Rajini
>
>
> On Sun, Feb 19, 2017 at 11:54 PM, Becket Qin  wrote:
>
> > Thanks for the KIP, Rajini,
> >
> > If I understand correctly the proposal was essentially trying to quota
> the
> > CPU usage (that is probably why time slice is used instead of request
> rate)
> > while the existing quota we have is for network bandwidth.
> >
> > Given we are trying to throttle both CPU and Network, that implies the
> > following patterns for the clients:
> > 1. High CPU usage, high network usage.
> > 2. High CPU usage, low network usage.
> > 3. Low CPU usage, high network usage.
> > 4. Low CPU usage, low network usage
> >
> > Theoretically the existing quota addresses case 3 & 4. And this KIP seems
> > trying to address case 1 & 2. However, it might be helpful to understand
> > what we want to achieve with CPU and network quotas.
> >
> > People mainly use quota for two 

Re: Kafka Connect / Access to OffsetStorageReader from SourceConnector

2017-02-20 Thread Florian Hussonnois
Hi Jason,

Yes, this is the idea. The connector assigns a subset of files to each
task.

A task stores the size of file, the bytes offset and the bytes size of the
last sent record as a source offsets.
A file is finished when recordBytesOffsets + recordBytesSize =
fileBytesSize.

The connector should be able to start a thread in background to track
offsets for each assigned file.
When all tasks has finished the connector can stop tasks or assigned new
files by requesting tasks reconfiguration.

Another advantage of monitoring source offsets from the connector is detect
slow or failed tasks and if necessary to be able to restart all tasks.

Thanks,

2017-02-18 6:47 GMT+01:00 Jason Gustafson :

> Hey Florian,
>
> Can you explain a bit more how having access to the offset storage from the
> connector helps in your use case? I guess you are planning to use offsets
> to be able to tell when a task has finished a file?
>
> Thanks,
> Jason
>
> On Fri, Feb 17, 2017 at 4:45 AM, Florian Hussonnois  >
> wrote:
>
> > Hi Kafka Team,
> >
> > I'm developping a connector which need to monitor the progress of its
> tasks
> > in order to be able to request a tasks reconfiguration in some
> situations.
> >
> > Our connector is pretty simple. It's used to stream a thousands of files
> > into Kafka. The connector scans directories then schedules each task
> with a
> > set of assigned files.
> > When tasks are no longer required or new files are detected the connector
> > requests a reconfiguration.
> >
> > In addition, files are store into a shared storage which is accessible
> from
> > each connect worker. In that way, we can distribute file streaming.
> >
> > For that prupose, it would be very convenient to have access to an
> > offsetStorageReader instance from either the Connector class or the
> > ConnectorContext class.
> >
> > I found a similar question:
> > https://www.mail-archive.com/dev@kafka.apache.org/msg50579.html
> >
> > Do you think this improvement could be considered ? I can contribute to
> it.
> >
> > Thanks,
> >
> > --
> > Florian HUSSONNOIS
> >
>



-- 
Florian HUSSONNOIS


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Rajini Sivaram
Dong, Onur & Becket,

Thank you all for the very useful feedback.

The choice of request handling time as opposed to request rate was based on
the observation in KAFKA-4195
 that request rates may
be less intuitive to configure than percentage utilization. But since the
KIP is measuring time rather than request pool utilization as suggested in
the JIRA, I agree that request rate would probably work better than
percentage. So I am inclined to change the KIP to throttle on request rates
(e.g 100 requests per second) rather than percentage. Average request rates
are exposed as metrics, so admin can configure quotas based on that. And
the values are more meaningful from the client application point of view. I
am still interested in feedback regarding the second rejected alternative
that throttles based on percentage utilization of resource handler pool.
That was the suggestion from Jun/Ismael in KAFKA-4195, but I couldn't see
how that would help in the case where a small number of connections pushed
a continuous stream of short requests. Suggestions welcome.

Responses to other questions above:

- (Dong): The KIP proposes to throttle most requests (and not just
Produce/Fetch) since the goal is to control usage of broker resources. So
LeaderAndIsrRequest and MetadataRequest will also be throttled. The few
requests not being throttled are timing-sensitive.

- (Dong): The KIP does not propose to throttle inter-broker traffic based
on request rates. The most frequent requests in inter-broker traffic are
fetch requests and a well configured broker would use reasonably good
values of min.bytes and max.wait that avoids overloading the broker
unnecessarily with fetch requests. The existing byte-rate based quotas
should be sufficient in this case.

- (Onur): Quota window configuration - this is the existing configuration
quota.window.size.seconds (also used for byte-rate quotas)

- (Becket): The main issue that the KIP is addressing is clients flooding
the broker with small requests (eg. fetch with max.wait.ms=0), which can
overload the broker and delay requests from other clients/users even though
the byte rate is quite small. CPU quota reflects the resource usage on the
broker that the KIP is attempting to limit. Since this is the time on the
local broker, it shouldn't vary much depending on acks=-1 etc. but I do
agree on the unpredictability of time based quotas. Switching from request
processing time to request rates will address this. Would you still be
concerned that "*Users do not have direct control over the request rate,
i.e. users do **not know when a request will be sent by the clients*"?

Jun/Ismael,

I am interested in your views on request rate based quotas and whether we
should still consider utilization of the resource handler pool.


Many thanks,

Rajini


On Sun, Feb 19, 2017 at 11:54 PM, Becket Qin  wrote:

> Thanks for the KIP, Rajini,
>
> If I understand correctly the proposal was essentially trying to quota the
> CPU usage (that is probably why time slice is used instead of request rate)
> while the existing quota we have is for network bandwidth.
>
> Given we are trying to throttle both CPU and Network, that implies the
> following patterns for the clients:
> 1. High CPU usage, high network usage.
> 2. High CPU usage, low network usage.
> 3. Low CPU usage, high network usage.
> 4. Low CPU usage, low network usage
>
> Theoretically the existing quota addresses case 3 & 4. And this KIP seems
> trying to address case 1 & 2. However, it might be helpful to understand
> what we want to achieve with CPU and network quotas.
>
> People mainly use quota for two different purposes:
> a) protecting the broker from misbehaving clients, and
> b) resource distribution for multi-tenancy.
>
> I agree that generally speaking CPU time is a suitable metric to quota on
> for CPU usage and would work for a). However, as Dong and Onur noticed, it
> is not easy to quantify the impact for the end users at application level
> with a throttled CPU time. If the purpose of the CPU quota is only for
> protection, maybe we don't need a user facing CPU quota.
>
> That said, a user facing CPU quota could be useful for virtualization,
> which maybe related to multi-tenancy but is a little different. Imagine
> there are 10 services sharing the same physical Kafka cluster. With CPU
> time quota and network bandwidth quota, each service can provision a
> logical Kafka cluster with some reserved CPU time and network bandwidth.
> And in this case the quota will be on per logic cluster. Not sure if this
> is what the KIP is intended in the future, though. It would be good if the
> KIP can be more clear on what exact scenarios the CPU quota is trying to
> address.
>
> As of the request rate quota, while it seems easy to enforce and intuitive,
> there are some caveats.
> 1. Users do not have direct control over the request rate, i.e. users do
> not known when a 

[jira] [Created] (KAFKA-4782) change chroot for a kafka instance

2017-02-20 Thread polarbear (JIRA)
polarbear created KAFKA-4782:


 Summary: change chroot for a kafka instance
 Key: KAFKA-4782
 URL: https://issues.apache.org/jira/browse/KAFKA-4782
 Project: Kafka
  Issue Type: Improvement
Reporter: polarbear


Hello

I am a beginner of the kafka.
Our kafka is "kafka_2.11-0.10.0.1.jar.asc", should be 0.10.0.1.
chroot is default "/".

Our user ask us to change the chroot from "/" to "/kafka".
I change the parameter zookeeper.connect to add /kafka, but we cannot find  the 
old topic in "/kafka", so I think it should be a new instance.
Which tool can we use to migrate the topic from the old "/" to new "/kafka"?

Thank you




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)