Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Michael Pearce
+1 on this slimmer version of our proposal

I def think the Id space we can reduce from the proposed int32(4bytes) down to 
int16(2bytes) it saves on space and as headers we wouldn't expect the number of 
headers being used concurrently being that high.

I would wonder if we should make the value byte array length still int32 though 
as This is the standard Max array length in Java saying that it is a header and 
I guess limiting the size is sensible and would work for all the use cases we 
have in mind so happy with limiting this.

Do people generally concur on Magnus's slimmer version? Anyone see any issues 
if we moved from int32 to int16?

Re configurable ids per plugin over a global registry also would work for us.  
As such if this has better concensus over the proposed global registry I'd be 
happy to change that.

I was already sold on ints over strings for keys ;)

Cheers
Mike


From: Magnus Edenhill 
Sent: Monday, November 7, 2016 10:10:21 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-82 - Add Record Headers

Hi,

I'm +1 for adding generic message headers, but I do share the concerns
previously aired on this thread and during the KIP meeting.

So let me propose a slimmer alternative that does not require any sort of
global header registry, does not affect broker performance or operations,
and adds as little overhead as possible.


Message

The protocol Message type is extended with a Headers array consting of
Tags, where a Tag is defined as:
   int16 Id
   int16 Len  // binary_data length
   binary_data[Len]  // opaque binary data


Ids
---
The Id space is not centrally managed, so whenever an application needs to
add headers, or use an eco-system plugin that does, its Id allocation will
need to be manually configured.
This moves the allocation concern from the global space down to
organization level and avoids the risk for id conflicts.
Example pseudo-config for some app:
sometrackerplugin.tag.sourcev3.id=1000
dbthing.tag.tablename.id=1001
myschemareg.tag.schemaname.id=1002
myschemareg.tag.schemaversion.id=1003


Each header-writing or header-reading plugin must provide means (typically
through configuration) to specify the tag for each header it uses. Defaults
should be avoided.
A consumer silently ignores tags it does not have a mapping for (since the
binary_data can't be parsed without knowing what it is).

Id range 0..999 is reserved for future use by the broker and must not be
used by plugins.



Broker
-
The broker does not process the tags (other than the standard protocol
syntax verification), it simply stores and forwards them as opaque data.

Standard message translation (removal of Headers) kicks in for older
clients.


Why not string ids?
-
String ids might seem like a good idea, but:
 * does not really solve uniqueness
 * consumes a lot of space (2 byte string length + string, per header) to
be meaningful
 * doesn't really say anything how to parse the tag's data, so it is in
effect useless on its own.


Regards,
Magnus




2016-11-07 18:32 GMT+01:00 Michael Pearce :

> Hi Roger,
>
> Thanks for the support.
>
> I think the key thing is to have a common key space to make an ecosystem,
> there does have to be some level of contract for people to play nicely.
>
> Having map or as per current proposed in kip of having a
> numerical key space of  map is a level of the contract that
> most people would expect.
>
> I think the example in a previous comment someone else made linking to AWS
> blog and also implemented api where originally they didn’t have a header
> space but not they do, where keys are uniform but the value can be string,
> int, anything is a good example.
>
> Having a custom MetadataSerializer is something we had played with, but
> discounted the idea, as if you wanted everyone to work the same way in the
> ecosystem, having to have this also customizable makes it a bit harder.
> Think about making the whole message record custom serializable, this would
> make it fairly tricky (though it would not be impossible) to have made work
> nicely. Having the value customizable we thought is a reasonable tradeoff
> here of flexibility over contract of interaction between different parties.
>
> Is there a particular case or benefit of having serialization customizable
> that you have in mind?
>
> Saying this it is obviously something that could be implemented, if there
> is a need. If we did go this avenue I think a defaulted serializer
> implementation should exist so for the 80:20 rule, people can just have the
> broker and clients get default behavior.
>
> Cheers
> Mike
>
> On 11/6/16, 5:25 PM, "radai"  wrote:
>
> making header _key_ serialization configurable potentially undermines
> the
> board usefulness of the feature (any point along the path must 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Michael Pearce
For me 5c and 5a are almost identical.

The idea in the kip(5a) is that the core message just has a header length and 
then the header bytes, which are then in a pre agreed sub wire protocol as 
described.

5c instead of having a pre agreed wire format allows custom serialisation of a 
map of 

The advantage of 5a is that it doesn't close the door ever, of the broker one 
day if needed understanding the headers.

Also 5a would allow for the future possibility of broker to handle any changes 
in the wireformat with upgrade and downgrading dependent on client version as 
we do the message.

The obvious disadvantage is no custom serialisation, but here we are just 
talking about how to serialise a vector of ints and byte[] is there that much 
benefit of having that custom?

From: Roger Hoover 
Sent: Tuesday, November 8, 2016 12:01:38 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-82 - Add Record Headers

Nacho,

Thanks for the summary.  #5 is not a binary decision, right?

5a) headers could be "fully" native as proposed - meaning both clients and
brokers would be able to list all keys.
5b) headers could be inside the existing value field.  in this case, only
clients would understand the container format and brokers would remain
unchanged.
5c) headers could be inside a new "metadata" field which would be opaque
bytes as far as the core broker protocol and on-disk format (not part of
the existing value field) but understood by clients.

I guess I'm asking what the reasons are to favor 5a over 5c.  For the case
of broker plugins, those plugins could also understand the common header
format.

Cheers,

Roger


On Mon, Nov 7, 2016 at 3:25 PM, Nacho Solis 
wrote:

> ​Hey Roger.
>
> The original design involved:
> 1- a header set per message (an array of key+values)
> 2- a message level API to set/get headers.
> 3- byte[] header-values
> 4- int header-keys
> 5- headers encoded at the protocol/core level
>
>
> 1- I think most (not all) people would agree that having metadata per
> message is a good thing. Headers is one way to provide this.
>
> 2- There are many use cases for the headers. Quite number of them are at
> the message level. Given this we expect the best way to do this is by
> giving an API at the message level.  Agreement is not at 100% here on
> providing an API to get/set headers available to all.  Some believe this
> should be done purely by interceptors instead of direct API calls.  How
> this "map" is presented to the user via the API can still being fine tuned.
>
> 3- byte[] header values allow the encoding of anything.  This is a black
> box that does not need to be understood by anybody other than the
> plugin/code that wrote the header to start with.  A plugin, if it so
> wishes, could have a custom serializer.  So in here, if somebody wanted to
> use protobuf or avro or what have you you could do that.
>
> 4- int header keys are in the proposal. This offers a very compact
> representation with an easy ability to segment the space. Coordination is
> needed in one way or another, whether ints are used or strings are used.
> In our testing ints are faster than strings... is this performance boost
> worth it?  We have differing opinions.  A lot of people would argue that
> the flexibility of strings plus their ability to have long lengths make
> coordination easier, and that compression will take care of the overhead.
> I will make a quick note that HTTP2, which in theory uses strings as
> headers uses static header compression, effectively using ints for the core
> headers and a precomputed Huffman table for other strings. (
> https://tools.ietf.org/html/rfc7541).
>
> 5- This is the big sticking point.  Should headers be done at the protocol
> level (native) or as a container/wrapper inside the V part of the message.
>
> Benefits of doing container:
> - no modification to the broker
> - no modification to the open source client.
>
> Benefits of doing native:
> - core can use headers (compaction, exactly-once, etc)
> - broker can have plugins
> - open source client can have plugins
> - no need to worry about aliasing (interoperability between headers and no
> header supporting clients)
>
>
> There are a few other benefits that seem to come bundled into the native
> implementation but could be made available in the container format.
>
> For example, we could develop a shared open source client that offers a
> container format. This would allow us to:
> - have other open source projects depend on headers
> - create a community to share plugins
>
> This container format client could be completely separate from Apache Kafka
> or it could be part of Apache Kafka. The people that would like to use
> headers can use that client, and the people that think it's an overhead can
> use the one without.
>
>
> Nacho
>
>
> On Mon, Nov 7, 2016 at 2:54 PM, Roger Hoover 
> wrote:

Re: [DISCUSS] KIP-89: Allow sink connectors to decouple flush and offset commit

2016-11-07 Thread Ewen Cheslack-Postava
I don't have much to say here since I reviewed a draft of this already. But
I did want to point out that although it expands the API for Connectors
(which I think we should be conservative about), it has the really nice
impact that it allows connectors that really care about high throughput to
avoid flushing their pipelines since they get better control over the
offsets to commit. We already supported this in the framework automatically
with source connectors, but this is a nice addition that extends that to
sink connectors as well.

-Ewen

On Fri, Nov 4, 2016 at 2:16 PM, Shikhar Bhushan 
wrote:

> Hi all,
>
> I created KIP-89 for making a Connect API change that allows for sink
> connectors to decouple flush and offset commits.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 89%3A+Allow+sink+connectors+to+decouple+flush+and+offset+commit
>
> I'd welcome your input.
>
> Best,
>
> Shikhar
>



-- 
Thanks,
Ewen


Re: Use Android App as a “Producing client” for Kafka?

2016-11-07 Thread Ewen Cheslack-Postava
Artur,

It is possible to do this, but the second approach you mentioned is much
more common. Normally people don't want to expose their Kafka cluster
publicly, so having an intermediary can be a good way to, e.g., add a layer
where you can easily filter out bad traffic. You may be able to use some of
Kafka's newer security features to get enough security for your use case,
however. Additionally, you'll likely need to tweak some of the default
settings as they are good defaults for use within a data center, but not
over the link your Android app will probably have.

-Ewen

On Fri, Nov 4, 2016 at 11:40 PM,  wrote:

> Hi folks,
>
> is it possible / does it make sense to use an Android app as a "Producing
> client" for Apache Kafka?
>
> Let's say my Android App need to capture and analyse reaction time data.
> Goal is to collect all data and show the average reaction time in real-time
> in the App.
>
> The alternative is having an app server of some kind as an intermediary
> that accepts messages from the android app and posts them to Kafka, rather
> than having the app be a Kafka Producer on its own.
>
>
>
> See: http://stackoverflow.com/questions/40043532/how-to-use-
> android-app-as-a-producing-client-for-kafka
>
>
>
> Best Regards,
>
> Artur
>



-- 
Thanks,
Ewen


Re: Kafka Connect key.converter and value.converter properties for Avro encoding

2016-11-07 Thread Ewen Cheslack-Postava
On Mon, Nov 7, 2016 at 7:14 AM,  wrote:

> Hi Ewen,
>
> Sorry but I didn't understand much of that.
>
> I currently have an implementation of the Converter interface that uses
> Avro's
> BinaryEncoder/Decoder, SpecificDatumReader/Writer.
>
> The main mismatch I faced is that I need to use org.apache.avro.Schema for
> serialization whereas the Converter interface requires a
> org.apache.kafka.connect.data.Schema schema.
>

This was the main point I was trying to get at. The schemas that are used
in the Converter interface (parameter to fromConnectData, return value of
toConnectData) are Connect schemas. The values that go with those schemas
are *not* in a serialization-specific runtime format, i.e. you cannot just
use SpecificRecords. Instead they are in the format of the Connect data
API. The equivalent of Generic/SpecificRecord would be Struct (
http://docs.confluent.io/3.0.1/connect/javadocs/index.html?org/apache/kafka/connect/data/Struct.html
).

Although the only requirement of the Converter interface is that you
convert:

byte[] <-> (Connect Schema, Connect data value)

in practice many Converters will be decomposed into the following:

byte[] <-> (serialization format specific schema, serialization format
specific runtime data) <-> (Connect Schema, Connect data value)

i.e. in the case of Avro

byte[] <-> (Avro Schema, SpecificRecord) <-> (Connect Schema, Connect data
value)



>
> In the absence of a transformer to interconvert between these Schema
> representations (are any available?) I have, for now, gone for the slightly
> fragile approach of inferring the schema from the topic name (we currently
> have a topic per event type).  This means I ignore the schema parameter in
> fromConnectData and return a null schema in toConnectData.
>

Our AvroConverter has a class that does conversion of Avro <-> Connect:
https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java
This is combined with our normal Kafka de/serializers to perform the
complete conversion. Note, however, that this API is internal and not
guaranteed to be stable.


>
> With this I can create a simple Kafka consumer that correctly reads these
> binary Avro encoded events generated by my Kafka Connect source, once I've
> set the Kafka value.deserializer property to my serializer class which
> implements Deserializer, which in turn (re)uses my Kafka
> Connect converter class internally.
>
> However, I've noticed something odd: the fromConnectData  invocations come
> in 2 forms:
>
> 1. schema = null, record = null
> 2. schema = Schema{BYTES}, record = a JSON structure


> Schema{BYTES} is, I presume, because I specify Schema.BYTES_SCHEMA as the
> 4th arg to the SourceRecord ctr.
>
> Any idea why form 1 occurs?
>

Form 1 isn't even the only form that can occur. Connect actually supports
schemaless data as well, where you would see (schema=null, record=). (Though you will never see Structs since they require
a schema; however complex data can be represented as maps & lists.)

However, for that particular case you are almost definitely seeing this
from converting a null value in Kafka. If the value in Kafka is null,
there's no way to know what the intended schema was, so it has to be left
blank. However, since null is used in compacted topics for deletion, it is
important to actually translate null values in Connect to be true nulls in
Kafka.


>
> Thanks again,
> David
>
>
>
>
>
>
>
> -Original Message-
> From: Ewen Cheslack-Postava [mailto:e...@confluent.io]
> Sent: 07 November 2016 04:35
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect key.converter and value.converter properties
> for Avro encoding
>
> You won't be accepting/returning SpecificRecords directly when working
> with Connect's API. Connect intentionally uses an interface that is
> different from Kafka serializers because we deal with structured data that
> the connectors need to be able to understand. We define a generic runtime
> representation for data (under org.apache.kafka.connect.data) and
> Converters are responsible for taking the data all the way through any
> byte[] -> serialization-specific format (e.g. SpecificRecord) -> Connect
> Data API.
>
> Even though your approach to handling Avro isn't exactly the same, I'd
> still suggest taking a look at our implementation. You'll be able to see
> how we separate this into those two steps, utilizing our normal
> Avro(De)Serializer to do byte[] <-> Avro conversions and then a separate
> class to do Avro <-> Connect Data API conversions. You could probably reuse
> the Avro <-> Connect Data API directly and only use the small bit of code
> you included for doing the byte[] <-> Avro conversion.
>
> re: configure(), yes, it's safe for it to be a noop as long as your
> Converter really doesn't require *any* configuration. But I would guess it
> at least needs to know the SpecificRecord class or schema you are trying to
> 

[GitHub] kafka pull request #2111: throw exception when the connection is error.

2016-11-07 Thread huyanping
GitHub user huyanping opened a pull request:

https://github.com/apache/kafka/pull/2111

throw exception when the connection is error.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huyanping/kafka trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2111.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2111


commit 7f92c7bd55568d813bbe787478572a6f40fc0780
Author: huyanping 
Date:   2016-11-08T04:21:46Z

throw exception when the connection is error.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4389) kafka-server.stop.sh not work

2016-11-07 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15646319#comment-15646319
 ] 

huxi commented on KAFKA-4389:
-

Seems it is a duplicate of issue 4297. 
https://issues.apache.org/jira/browse/KAFKA-4297

> kafka-server.stop.sh not work
> -
>
> Key: KAFKA-4389
> URL: https://issues.apache.org/jira/browse/KAFKA-4389
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.1.0
> Environment: centos7
>Reporter: JianwenSun
>
> Ths proc/pid/cmdline is 4096 bytes limit, so ps ax | grep 'kafka/.kafka' do 
> not work.  I also don't want to use jsp.  Any other ways? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4389) kafka-server.stop.sh not work

2016-11-07 Thread JianwenSun (JIRA)
JianwenSun created KAFKA-4389:
-

 Summary: kafka-server.stop.sh not work
 Key: KAFKA-4389
 URL: https://issues.apache.org/jira/browse/KAFKA-4389
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 0.10.1.0
 Environment: centos7
Reporter: JianwenSun


Ths proc/pid/cmdline is 4096 bytes limit, so ps ax | grep 'kafka/.kafka' do not 
work.  I also don't want to use jsp.  Any other ways? 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4386) Producer Metrics Explanation

2016-11-07 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15646087#comment-15646087
 ] 

huxi commented on KAFKA-4386:
-

In my understanding, these metrics are all request-level. Take request latency 
as example, it measure the average time of request latency, which is the 
difference between the response received time and  request creation time. As 
for the multiple producers, each producer has its own client id, and in turn 
has its own measure.

> Producer Metrics Explanation
> 
>
> Key: KAFKA-4386
> URL: https://issues.apache.org/jira/browse/KAFKA-4386
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Pratik kumar
>  Labels: producer
>
> Context :
> Kafka Producer 0.8.x
> Problem:
> Kafka Producer emits metrics regarding request size stats, request latency 
> and request rate stats.
> But the inherent meaning of the these metrics are not clear. What does this 
> measure?
> Is for each producer send request(which contains batches of messages per 
> broker)? OR Is it for a batch of messages defined according to user batching 
> policy? What happens when some application code has multiple async producers 
> to increase performance (how are rate and percentiles measured?)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-07 Thread Jun Rao
Hi, Rajini,

A couple of other questions on the KIP.

10. For the config values stored in ZK, are those keys (s, t, k, i, etc)
stored under scram-sha-256 standard?

11. Could KIP-48 (delegation token) use this KIP to send delegation tokens?
In KIP-48, the client sends a HMAC as the delegation token to the server.
Not sure how this gets mapped to the username/password in this KIP.

Thanks,

Jun

On Tue, Oct 4, 2016 at 6:43 AM, Rajini Sivaram  wrote:

> Hi all,
>
> I have just created KIP-84 to add SCRAM-SHA-1 and SCRAM-SHA-256 SASL
> mechanisms to Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 84%3A+Support+SASL+SCRAM+mechanisms
>
>
> Comments and suggestions are welcome.
>
> Thank you...
>
> Regards,
>
> Rajini
>


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-07 Thread Jun Rao
For the use case, one potential use case is for schema registration. For
example, in Avro, a null value corresponds to a Null schema. So, if you
want to be able to keep the schema id in a delete message, the value can't
be null. We could get around this issue by specializing null value during
schema registration though.

Now for the proposed changes. We probably should preserve client
compatibility. If a client application is sending a null value to a
compacted topic, ideally, it should work the same after the client upgrades.

I am not sure about making the tombstone marker configurable, especially at
the topic level. Should we allow users to change the config values back and
forth, and what would be the implication?

Thanks,

Jun

On Mon, Nov 7, 2016 at 10:48 AM, Becket Qin  wrote:

> Hi Michael,
>
> Yes, changing the logic in the log cleaner makes sense. There could be some
> other thing worth thinking (e.g. the message size change after conversion),
> though.
>
> The scenario I was thinking is the following:
> Imagine a distributed caching system built on top of Kafka. A user is
> consuming from a topic and it is guaranteed that if the user consume to the
> log end it will get the latest value for all the keys. Currently if the
> consumer sees a null value it knows the key has been removed. Now let's say
> we rolled out this change. And the producer applies a message with the
> tombstone flag set, but the value was not null. When we append that message
> to the log I suppose we will not do the down conversion if the broker has
> set the message.format.version to the latest. Because the log cleaner won't
> touch the active log segment, so that message will be sitting in the active
> segment as is. Now when a consumer that hasn't upgraded yet consumes that
> tombstone message in the active segment, it seems that the broker will need
> to down convert that message to remove the value, right? In this case, we
> cannot wait for the log cleaner to do the down conversion because that
> message may have already been consumed before the log compaction happens.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Nov 7, 2016 at 9:59 AM, Michael Pearce 
> wrote:
>
> > Hi Becket,
> >
> > We were thinking more about having the logic that’s in the method
> > shouldRetainMessage configurable via http://kafka.apache.org/
> > documentation.html#brokerconfigs  at a broker/topic level. And then
> scrap
> > auto converting the message, and allow organisations to manage the
> rollout
> > of enabling of the feature.
> > (this isn’t in documentation but in response to the discussion thread as
> > an alternative approach to roll out the feature)
> >
> > Does this make any more sense?
> >
> > Thanks
> > Mike
> >
> > On 11/3/16, 2:27 PM, "Becket Qin"  wrote:
> >
> > Hi Michael,
> >
> > Do you mean using a new configuration it is just the exiting
> > message.format.version config? It seems the message.format.version
> > config
> > is enough in this case. And the default value would always be the
> > latest
> > version.
> >
> > > Message version migration would be handled as like in KIP-32
> >
> > Also just want to confirm on this. Today if an old consumer consumes
> a
> > log
> > compacted topic and sees an empty value, it knows that is a
> tombstone.
> > After we start to use the attribute bit, a tombstone message can
> have a
> > non-empty value. So by "like in KIP-32" you mean we will remove the
> > value
> > to down convert the message if the consumer version is old, right?
> >
> > Thanks.
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Nov 2, 2016 at 1:37 AM, Michael Pearce <
> michael.pea...@ig.com>
> > wrote:
> >
> > > Hi Joel , et al.
> > >
> > > Any comments on the below idea to handle roll out / compatibility
> of
> > this
> > > feature, using a configuration?
> > >
> > > Does it make sense/clear?
> > > Does it add value?
> > > Do we want to enforce flag by default, or value by default, or
> both?
> > >
> > > Cheers
> > > Mike
> > >
> > >
> > > On 10/27/16, 4:47 PM, "Michael Pearce" 
> > wrote:
> > >
> > > Thanks, James, I think this is a really good addition to the
> KIP
> > > details, please feel free to amend the wiki/add the use cases, also
> > if any
> > > others you think of. I definitely think its worthwhile documenting.
> > If you
> > > can’t let me know ill add them next week (just leaving for a long
> > weekend
> > > off)
> > >
> > > Re Joel and others comments about upgrade and compatibility.
> > >
> > > Rather than trying to auto manage this.
> > >
> > > Actually maybe we make a configuration option, both at server
> > and per
> > > topic level to control the behavior of how the server logic should
> > work out
> > > if the record, is 

Re: [DISCUSS] KIP-84: Support SASL/SCRAM mechanisms

2016-11-07 Thread Jun Rao
I think Magnus brought up the point of bumping up the version of
SaslHandshakeRequest
during the discussion of KIP-35. A broker may be capable of supporting a
list of SASL mechanisms X and may be enabling a list of SASL mechanisms Y.
Y will be a subset of X. Now, from a client's perspective, it may be useful
to differentiate the two cases. For example, the client could at least give
different error messages. If a broker is not capable of supporting a SASL
mechanism, the error could say "please upgrade the broker to a particular
version". If the broker is capable, but not enabled with a particular SASL
mechanism, the error could just say "please enable this SASL mechanism in
the broker". We will be able to distinguish these two cases if we bump up
the version of SaslHandshakeRequest every time we add the support for a new
SASL mechanism. Not sure how critical this is, but that's my understanding
from the KIP-35 discussion. Perhaps Magnus could chime in.

Ewen,

I am not sure that I get your comment in the brackets. ApiVersionRequest
happens before SaslHandshakeRequest and doesn't need SASL authentication to
be completed.

Thanks,

Jun


On Thu, Nov 3, 2016 at 8:18 AM, Harsha Chintalapani  wrote:

> Agree with Rajini on not incrementing the protocol version. As brokers are
> returning the list of supported mechanisms don't think it warrants a
> protocol version bump.
>
> Thanks,
> Harsha
>
> On Thu, Nov 3, 2016 at 7:59 AM Rajini Sivaram <
> rajinisiva...@googlemail.com>
> wrote:
>
> > Ismael,
> >
> > 2) I left out MD5 because it is insecure, but thought of keeping SHA-1
> > since it is supported by most services that support SCRAM today. Since
> > there is no actual requirement to support SHA-1 and we don't really want
> to
> > store credentials in Zookeeper using SHA-1, it makes sense not to support
> > SHA-1. Will update the KIP. Thanks.
> >
> > On Thu, Nov 3, 2016 at 1:25 PM, Ismael Juma  wrote:
> >
> > > Hi all,
> > >
> > > I'm currently with limited internet access so I have not done a proper
> > > review of the KIP, my apologies. A couple of thoughts:
> > >
> > > 1. I agree with Rajini and I don't see the value in bumping the
> protocol
> > > version in this case. As Rajini said, the enabled mechanisms are
> > > independent of the broker version. If and when we have a feature API,
> we
> > > can consider exposing it that way.
> > >
> > > 2. Do we really want to support sha1? The security of it has been
> > > questioned and the general recommendation is to avoid it these days.
> > >
> > > Ismael
> > >
> > > On 3 Nov 2016 6:51 am, "Rajini Sivaram" 
> > > wrote:
> > >
> > > > I don't have a strong objection to bumping up SaslHandshakeRequest
> > > version,
> > > > though I am still not convinced of the usefulness. I do agree that
> > KIP-35
> > > > should be standard way to determine client/broker compatibility. But
> I
> > am
> > > > not sure ApiVersionsRequest is a good fit for clients to choose a
> SASL
> > > > mechanism.
> > > >
> > > >1. SaslHandshakeRequest is the only way a client can figure out
> if a
> > > >SASL mechanism is actually enabled in the broker. The fact that
> > broker
> > > >version n supports SCRAM doesn't imply that a broker of version n
> > > > actually
> > > >has the mechanism enabled. Since enabling a mechanism involves
> some
> > > > effort
> > > >like installing an authentication server and/or configuring
> > > credentials,
> > > >SASL mechanisms are a feature of a broker installation very unlike
> > > >versions.  As you said, "features" would have worked better.
> > > >2. New SASL mechanisms can be added to older Kafka broker
> versions.
> > > With
> > > >some tweaking, the SCRAM implementation from KIP-84 can be enabled
> > in
> > > a
> > > >0.10.0 broker without changing any broker code. KIP-86 would make
> > this
> > > > even
> > > >easier, but it is already possible to add new or custom mechanisms
> > to
> > > >existing broker versions. A client using ApiVersionsRequest to
> > choose
> > > a
> > > >SASL mechanism is only checking when a mechanism was included in a
> > > > broker
> > > >release, not the "capability" of a broker to support the
> mechanism.
> > I
> > > am
> > > >not sure we should encourage clients to choose mechanisms based on
> > > > versions.
> > > >3. Clients need additional configuration based on the chosen
> > > mechanism.
> > > >One of the reasons I couldn't see any value in using
> > > ApiVersionsRequest
> > > > in
> > > >the Java client was because clients are configured with a single
> > SASL
> > > >mechanism and a JAAS configuration corresponding to that
> mechanism.
> > > If a
> > > >client wants to choose between Kerberos and SCRAM, the client
> would
> > > need
> > > >keytab/principal for kerberos and username/password for SCRAM.
> > Clients
> > > > that
> > > >possess multiple credentials 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Roger Hoover
Nacho,

Thanks for the summary.  #5 is not a binary decision, right?

5a) headers could be "fully" native as proposed - meaning both clients and
brokers would be able to list all keys.
5b) headers could be inside the existing value field.  in this case, only
clients would understand the container format and brokers would remain
unchanged.
5c) headers could be inside a new "metadata" field which would be opaque
bytes as far as the core broker protocol and on-disk format (not part of
the existing value field) but understood by clients.

I guess I'm asking what the reasons are to favor 5a over 5c.  For the case
of broker plugins, those plugins could also understand the common header
format.

Cheers,

Roger


On Mon, Nov 7, 2016 at 3:25 PM, Nacho Solis 
wrote:

> ​Hey Roger.
>
> The original design involved:
> 1- a header set per message (an array of key+values)
> 2- a message level API to set/get headers.
> 3- byte[] header-values
> 4- int header-keys
> 5- headers encoded at the protocol/core level
>
>
> 1- I think most (not all) people would agree that having metadata per
> message is a good thing. Headers is one way to provide this.
>
> 2- There are many use cases for the headers. Quite number of them are at
> the message level. Given this we expect the best way to do this is by
> giving an API at the message level.  Agreement is not at 100% here on
> providing an API to get/set headers available to all.  Some believe this
> should be done purely by interceptors instead of direct API calls.  How
> this "map" is presented to the user via the API can still being fine tuned.
>
> 3- byte[] header values allow the encoding of anything.  This is a black
> box that does not need to be understood by anybody other than the
> plugin/code that wrote the header to start with.  A plugin, if it so
> wishes, could have a custom serializer.  So in here, if somebody wanted to
> use protobuf or avro or what have you you could do that.
>
> 4- int header keys are in the proposal. This offers a very compact
> representation with an easy ability to segment the space. Coordination is
> needed in one way or another, whether ints are used or strings are used.
> In our testing ints are faster than strings... is this performance boost
> worth it?  We have differing opinions.  A lot of people would argue that
> the flexibility of strings plus their ability to have long lengths make
> coordination easier, and that compression will take care of the overhead.
> I will make a quick note that HTTP2, which in theory uses strings as
> headers uses static header compression, effectively using ints for the core
> headers and a precomputed Huffman table for other strings. (
> https://tools.ietf.org/html/rfc7541).
>
> 5- This is the big sticking point.  Should headers be done at the protocol
> level (native) or as a container/wrapper inside the V part of the message.
>
> Benefits of doing container:
> - no modification to the broker
> - no modification to the open source client.
>
> Benefits of doing native:
> - core can use headers (compaction, exactly-once, etc)
> - broker can have plugins
> - open source client can have plugins
> - no need to worry about aliasing (interoperability between headers and no
> header supporting clients)
>
>
> There are a few other benefits that seem to come bundled into the native
> implementation but could be made available in the container format.
>
> For example, we could develop a shared open source client that offers a
> container format. This would allow us to:
> - have other open source projects depend on headers
> - create a community to share plugins
>
> This container format client could be completely separate from Apache Kafka
> or it could be part of Apache Kafka. The people that would like to use
> headers can use that client, and the people that think it's an overhead can
> use the one without.
>
>
> Nacho
>
>
> On Mon, Nov 7, 2016 at 2:54 PM, Roger Hoover 
> wrote:
>
> > Radai,
> >
> > If the broker must parse headers, then I agree that the serialization
> > probably should not be configurable.  However, the if the broker sees
> > metadata only as bytes and clients are the only components that serialize
> > and deserialize the headers, then pluggability seems reasonable.
> >
> > Cheers,
> >
> > Roger
> >
> > On Sun, Nov 6, 2016 at 9:25 AM, radai 
> wrote:
> >
> > > making header _key_ serialization configurable potentially undermines
> the
> > > board usefulness of the feature (any point along the path must be able
> to
> > > read the header keys. the values may be whatever and require more
> > intimate
> > > knowledge of the code that produced specific headers, but keys should
> be
> > > universally readable).
> > >
> > > it would also make it hard to write really portable plugins - say i
> > wrote a
> > > large message splitter/combiner - if i rely on key "largeMessage" and
> > > values of the form "1/20" someone who 

[jira] [Commented] (KAFKA-4353) Add semantic types to Kafka Connect

2016-11-07 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645840#comment-15645840
 ] 

Ewen Cheslack-Postava commented on KAFKA-4353:
--

[~rhauch] Some of these make sense to me, others don't as much. UUID is an 
example that I think most programming languages have as a built-in now, so 
probably makes more sense as a native type (although interestingly, I would 
have represented it as bytes, not in string form). JSON might be a good example 
of the opposite, where if you're really intent on not passing it through 
Connect (and it'd be painful for every Converter to have to also support JSON), 
then I agree just naming the type should be enough.

There's a bit more to my concern around a large # of logical types than just 
Converters having to support them. The good thing w/ Converters is that there 
are bound to be relatively few of them, so while adding more types is annoying, 
it's not the end of the world. But if there are 40 specialized types, do we 
actually think connectors are commonly going to be able to do something useful 
with them? I just worry about having 15 different types for time since most 
systems in practice only have a couple (the fact that you're looking at CDC is 
probably why you're seeing a lot more, but there it doesn't look to me like 
there's actually a lot of overlap).

I think this is just a matter of impedance mismatch between different systems 
and how far we think it makes sense to bend over backwards to preserve as much 
info as possible vs where reasonable compromises can be made that make the 
story for Converter/Connector developers sane (and, frankly, users since once 
the data exits connect, they presumably need to understand all the types that 
can be emitted as well).

I think the idea of semantic types makes sense -- we wanted to be able to name 
types for exactly this reason (beyond even these close-to-primitive types). You 
can of course do this already with your own names, I think you're just trying 
to get coordination between source and sink connectors (and maybe other 
applications if they maintain & know to look at the schema name) since you'd 
prefer not to do this with debezium-specific names? Will all of the ones you 
listed actually make sense for applications? Take MicroTime vs NanoTime as an 
example -- they end up eating up the same storage anyway, would it make sense 
to just do it all as NanoTime (whereas MilliTimestamp and MicroTimestamp cover 
different possible ranges of time).

It might also make sense to try to get some feedback from the community as to 
which of these they'd use (and which might be missing, including logical 
types). It's a lot more compelling to hear that a dozen connectors are 
providing UUID as just a string because they don't have a named type.

> Add semantic types to Kafka Connect
> ---
>
> Key: KAFKA-4353
> URL: https://issues.apache.org/jira/browse/KAFKA-4353
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.1
>Reporter: Randall Hauch
>Assignee: Ewen Cheslack-Postava
>
> Kafka Connect's schema system defines several _core types_ that consist of:
> * STRUCT
> * ARRAY
> * MAP
> plus these _primitive types_:
> * INT8
> * INT16
> * INT32
> * INT64
> * FLOAT32
> * FLOAT64
> * BOOLEAN
> * STRING
> * BYTES
> The {{Schema}} for these core types define several attributes, but they do 
> not have a name.
> Kafka Connect also defines several _logical types_ that are specializations 
> of the primitive types and _do_ have schema names _and_ are automatically 
> mapped to/from Java objects:
> || Schema Name || Primitive Type || Java value class || Description ||
> | o.k.c.d.Decimal | {{BYTES}} | {{java.math.BigDecimal}} | An 
> arbitrary-precision signed decimal number. |
> | o.k.c.d.Date | {{INT32}} | {{java.util.Date}} | A date representing a 
> calendar day with no time of day or timezone. The {{java.util.Date}} value's 
> hours, minutes, seconds, milliseconds are set to 0. The underlying 
> representation is an integer representing the number of standardized days 
> (based on a number of milliseconds with 24 hours/day, 60 minutes/hour, 60 
> seconds/minute, 1000 milliseconds/second with n) since Unix epoch. |
> | o.k.c.d.Time | {{INT32}} | {{java.util.Date}} | A time representing a 
> specific point in a day, not tied to any specific date. Only the 
> {{java.util.Date}} value's hours, minutes, seconds, and milliseconds can be 
> non-zero. This effectively makes it a point in time during the first day 
> after the Unix epoch. The underlying representation is an integer 
> representing the number of milliseconds after midnight. |
> | o.k.c.d.Timestamp | {{INT32}} | {{java.util.Date}} | A timestamp 
> representing an absolute time, without timezone 

Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update

2016-11-07 Thread Vahid S Hashemian
Hi Jason,

Thanks for your feedback.

Yes, the intent of the KIP is to make existing offsets of the group 
available even when there is no active consumers in the group consuming 
from one or more topic partitions.
Your suggestion should also work. I'm not yet sure how to obtain group's 
all topic partitions and need to look more closely at your 'null=all' 
suggestion, as I couldn't immediately see an obvious way to extract that 
info in the OffsetFetch handler. I'll dig deeper.
Just a quick note that using the OffsetFetch API would not address the 
second (but minor) problem described in the current KIP (the dummy group 
member). I assume that is not a big concern.

Thanks.
--Vahid




From:   Jason Gustafson 
To: dev@kafka.apache.org
Date:   11/07/2016 09:19 AM
Subject:Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update



Hey Vahid,

Thanks for the KIP. If I understand correctly, the problem is how to fetch
existing offsets for a group which has no active members, right? I'm not
totally clear why we need to modify the DescribeGroups API in order to
achieve this since we already have the OffsetFetch API. I think the
limitation currently is that you need to know the partitions to fetch
offsets for, but perhaps we could modify it to support the "null=all"
semantics that we used for the TopicMetadata API?

Thanks,
Jason

On Thu, Nov 3, 2016 at 11:09 AM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi all,
>
> I started a new KIP under
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 88%3A+DescribeGroups+Protocol+Update
> .
>
> The KIP is a proposal to update the DescribeGroups protocol to address
> KAFKA-3853 (https://issues.apache.org/jira/browse/KAFKA-3853).
>
> I appreciate your feedback.
>
> Thanks.
> --Vahid
>
>






Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Nacho Solis
​Hey Roger.

The original design involved:
1- a header set per message (an array of key+values)
2- a message level API to set/get headers.
3- byte[] header-values
4- int header-keys
5- headers encoded at the protocol/core level


1- I think most (not all) people would agree that having metadata per
message is a good thing. Headers is one way to provide this.

2- There are many use cases for the headers. Quite number of them are at
the message level. Given this we expect the best way to do this is by
giving an API at the message level.  Agreement is not at 100% here on
providing an API to get/set headers available to all.  Some believe this
should be done purely by interceptors instead of direct API calls.  How
this "map" is presented to the user via the API can still being fine tuned.

3- byte[] header values allow the encoding of anything.  This is a black
box that does not need to be understood by anybody other than the
plugin/code that wrote the header to start with.  A plugin, if it so
wishes, could have a custom serializer.  So in here, if somebody wanted to
use protobuf or avro or what have you you could do that.

4- int header keys are in the proposal. This offers a very compact
representation with an easy ability to segment the space. Coordination is
needed in one way or another, whether ints are used or strings are used.
In our testing ints are faster than strings... is this performance boost
worth it?  We have differing opinions.  A lot of people would argue that
the flexibility of strings plus their ability to have long lengths make
coordination easier, and that compression will take care of the overhead.
I will make a quick note that HTTP2, which in theory uses strings as
headers uses static header compression, effectively using ints for the core
headers and a precomputed Huffman table for other strings. (
https://tools.ietf.org/html/rfc7541).

5- This is the big sticking point.  Should headers be done at the protocol
level (native) or as a container/wrapper inside the V part of the message.

Benefits of doing container:
- no modification to the broker
- no modification to the open source client.

Benefits of doing native:
- core can use headers (compaction, exactly-once, etc)
- broker can have plugins
- open source client can have plugins
- no need to worry about aliasing (interoperability between headers and no
header supporting clients)


There are a few other benefits that seem to come bundled into the native
implementation but could be made available in the container format.

For example, we could develop a shared open source client that offers a
container format. This would allow us to:
- have other open source projects depend on headers
- create a community to share plugins

This container format client could be completely separate from Apache Kafka
or it could be part of Apache Kafka. The people that would like to use
headers can use that client, and the people that think it's an overhead can
use the one without.


Nacho


On Mon, Nov 7, 2016 at 2:54 PM, Roger Hoover  wrote:

> Radai,
>
> If the broker must parse headers, then I agree that the serialization
> probably should not be configurable.  However, the if the broker sees
> metadata only as bytes and clients are the only components that serialize
> and deserialize the headers, then pluggability seems reasonable.
>
> Cheers,
>
> Roger
>
> On Sun, Nov 6, 2016 at 9:25 AM, radai  wrote:
>
> > making header _key_ serialization configurable potentially undermines the
> > board usefulness of the feature (any point along the path must be able to
> > read the header keys. the values may be whatever and require more
> intimate
> > knowledge of the code that produced specific headers, but keys should be
> > universally readable).
> >
> > it would also make it hard to write really portable plugins - say i
> wrote a
> > large message splitter/combiner - if i rely on key "largeMessage" and
> > values of the form "1/20" someone who uses (contrived example)
> Map > Double> wouldnt be able to re-use my code.
> >
> > not the end of a the world within an organization, but problematic if you
> > want to enable an ecosystem
> >
> > On Thu, Nov 3, 2016 at 2:04 PM, Roger Hoover 
> > wrote:
> >
> > >  As others have laid out, I see strong reasons for a common message
> > > metadata structure for the Kafka ecosystem.  In particular, I've seen
> > that
> > > even within a single organization, infrastructure teams often own the
> > > message metadata while application teams own the application-level data
> > > format.  Allowing metadata and content to have different structure and
> > > evolve separately is very helpful for this.  Also, I think there's a
> lot
> > of
> > > value to having a common metadata structure shared across the Kafka
> > > ecosystem so that tools which leverage metadata can more easily be
> shared
> > > across organizations and integrated together.
> 

Re: [DISCUSS] KAFKA-4345: Run decktape test for each pull request

2016-11-07 Thread Ewen Cheslack-Postava
On Mon, Nov 7, 2016 at 10:30 AM Raghav Kumar Gautam 
wrote:

> Hi Ewen,
>
> Thanks for the feedback. Answers are inlined.
>
> On Sun, Nov 6, 2016 at 8:46 PM, Ewen Cheslack-Postava 
> wrote:
>
> > Yeah, I'm all for getting these to run more frequently and on lighter
> > weight infrastructure. (By the way, I also saw the use of docker; I'd
> > really like to get a "native" docker cluster type into ducktape at some
> > point so all you have to do is bake the image and then spawn containers
> on
> > demand.)
> >
> I completely agree, supporting docker integration in ducktape would be the
> ideal solution of the problem.
>
>
> >
> > A few things. First, it'd be nice to know if we can chain these with
> normal
> > PR builds or something like that. Even starting the system tests when we
> > don't know the unit tests will pass seems like it'd be wasteful.
> >
> If we do chaining one problem that it will bring is that the turn around
> time will suffer. It would take 1.5 hrs to run unit tests then another 1.5
> hrs to run decktape tests. Also, don't dev run relevant unit tests before
> they submit a patch ?
>

Yeah, I get that. Turnaround time will obviously suffer from serializing
anything. Here the biggest problem today is that Jenkins builds are not as
highly parallelized as most users run the tests locally, and the large
number of integration tests that are baked into the unit tests mean they
take quite a long time. While running the tests locally has been creeping
up quite a bit recently, it's still at least < 15min on a relatively recent
MBP. Ideally we could just get the Jenkins builds to finish faster...


> >
> > Second, agreed on getting things stable before turning this on across the
> > board.
>
> I have done some work for stabilizing the tests. But I need help from kafka
> community to take this further. It will be great if someone can guide me on
> how to do this ? Should we start with a subset of tests that are stable and
> enable others as we make progress ? Who are the people that can I work with
> on this problem ?
>

It'll probably be a variety of people because it depends on the components
that are unstable. For example, just among committers, different folks know
different areas of the code (and especially system tests) to different
degrees. I can probably help across the board in terms of ducktape/system
test stuff, but for any individual test you'll probably just want to git
blame to figure out who might be best to ask for help.

I can take a pass at this patch and see how much makes sense to commit
immediately. If we don't immediately start getting feedback on failing
tests and can instead make progress by requesting them manually on only
some PRs or something like that, then that seems like it could be
reasonable.

My biggest concern, just taking a quick pass at the changes, is that we're
doing a lot of renaming of tests just to split them up rather than by
logical grouping. If we need to do this, it seems much better to add a
small amount of tooling to ducktape to execute subsets of tests (e.g. split
across N subsets of the tests). It requires more coordination between
ducktape and getting this landed, but feels like a much cleaner solution,
and one that could eventually take advantage of additional information
(e.g. if it knows avg runtime from previous runs, then it can divide them
based on that instead of only considering the # of tests).


> > Confluent runs these tests nightly on full VMs in AWS and
> > historically, besides buggy logic in tests, underprovisioned resources
> tend
> > to be the biggest source of flakiness in tests.
> >
>  Good to know that I am not the only one worrying about this problem :-)
>
> Finally, should we be checking w/ infra and/or Travis folks before enabling
> > something this expensive? Are the Storm integration tests of comparable
> > cost? There are some in-flight patches for parallelizing test runs of
> > ducktape tests (which also results in better utilization). But even with
> > those changes, the full test run is still quite a few VM-hours per PR and
> > we only expect it to increase.
> >
> We can ask infra people about this. But I think this will not be a problem.
> For e.g. Flink  is
> using 11 hrs of computation time for each run. For kafka we are going to
> start with 6hrs. Also, with the docker setup we can bring up the whole 12
> node cluster on the laptop and run ducktape tests against it. So, test
> development cycles will become faster.
>

Sure, it's just that over time this tends to lead to the current state of
the Jenkins where it can take many hours before you get any feedback
because things are so backed up.

-Ewen


>
> With Regards,
> Raghav.
>
>
>
> >
> > -Ewen
> >
> > On Thu, Nov 3, 2016 at 11:26 AM, Becket Qin 
> wrote:
> >
> > > Thanks for the explanation, Raghav.
> > >
> > > If the workload is not a concern then it is 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Roger Hoover
Radai,

If the broker must parse headers, then I agree that the serialization
probably should not be configurable.  However, the if the broker sees
metadata only as bytes and clients are the only components that serialize
and deserialize the headers, then pluggability seems reasonable.

Cheers,

Roger

On Sun, Nov 6, 2016 at 9:25 AM, radai  wrote:

> making header _key_ serialization configurable potentially undermines the
> board usefulness of the feature (any point along the path must be able to
> read the header keys. the values may be whatever and require more intimate
> knowledge of the code that produced specific headers, but keys should be
> universally readable).
>
> it would also make it hard to write really portable plugins - say i wrote a
> large message splitter/combiner - if i rely on key "largeMessage" and
> values of the form "1/20" someone who uses (contrived example) Map Double> wouldnt be able to re-use my code.
>
> not the end of a the world within an organization, but problematic if you
> want to enable an ecosystem
>
> On Thu, Nov 3, 2016 at 2:04 PM, Roger Hoover 
> wrote:
>
> >  As others have laid out, I see strong reasons for a common message
> > metadata structure for the Kafka ecosystem.  In particular, I've seen
> that
> > even within a single organization, infrastructure teams often own the
> > message metadata while application teams own the application-level data
> > format.  Allowing metadata and content to have different structure and
> > evolve separately is very helpful for this.  Also, I think there's a lot
> of
> > value to having a common metadata structure shared across the Kafka
> > ecosystem so that tools which leverage metadata can more easily be shared
> > across organizations and integrated together.
> >
> > The question is, where does the metadata structure belong?  Here's my
> take:
> >
> > We change the Kafka wire and on-disk format to from a (key, value) model
> to
> > a (key, metadata, value) model where all three are byte arrays from the
> > brokers point of view.  The primary reason for this is that it provides a
> > backward compatible migration path forward.  Producers can start
> populating
> > metadata fields before all consumers understand the metadata structure.
> > For people who already have custom envelope structures, they can populate
> > their existing structure and the new structure for a while as they make
> the
> > transition.
> >
> > We could stop there and let the clients plug in a KeySerializer,
> > MetadataSerializer, and ValueSerializer but I think it is also be useful
> to
> > have a default MetadataSerializer that implements a key-value model
> similar
> > to AMQP or HTTP headers.  Or we could go even further and prescribe a
> > Map or Map data model for headers in the
> > clients (while still allowing custom serialization of the header data
> > model).
> >
> > I think this would address Radai's concerns:
> > 1. All client code would not need to be updated to know about the
> > container.
> > 2. Middleware friendly clients would have a standard header data model to
> > work with.
> > 3. KIP is required both b/c of broker changes and because of client API
> > changes.
> >
> > Cheers,
> >
> > Roger
> >
> >
> > On Wed, Nov 2, 2016 at 4:38 PM, radai 
> wrote:
> >
> > > my biggest issues with a "standard" wrapper format:
> > >
> > > 1. _ALL_ client _CODE_ (as opposed to kafka lib version) must be
> updated
> > to
> > > know about the container, because any old naive code trying to directly
> > > deserialize its own payload would keel over and die (it needs to know
> to
> > > deserialize a container, and then dig in there for its payload).
> > > 2. in order to write middleware-friendly clients that utilize such a
> > > container one would basically have to write their own producer/consumer
> > API
> > > on top of the open source kafka one.
> > > 3. if you were going to go with a wrapper format you really dont need
> to
> > > bother with a kip (just open source your own client stack from #2 above
> > so
> > > others could stop re-inventing it)
> > >
> > > On Wed, Nov 2, 2016 at 4:25 PM, James Cheng 
> > wrote:
> > >
> > > > How exactly would this work? Or maybe that's out of scope for this
> > email.
> > >
> >
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Roger Hoover
Magnus,

Thanks for jumping in.  Do you see a reason that the broker should
understand the header structure you've proposed? I'm wondering if metadata
should just be bytes from the broker's point of view but clients could
implement a common header serde spec on top.

Cheers,

Roger

On Mon, Nov 7, 2016 at 2:10 PM, Magnus Edenhill  wrote:

> Hi,
>
> I'm +1 for adding generic message headers, but I do share the concerns
> previously aired on this thread and during the KIP meeting.
>
> So let me propose a slimmer alternative that does not require any sort of
> global header registry, does not affect broker performance or operations,
> and adds as little overhead as possible.
>
>
> Message
> 
> The protocol Message type is extended with a Headers array consting of
> Tags, where a Tag is defined as:
>int16 Id
>int16 Len  // binary_data length
>binary_data[Len]  // opaque binary data
>
>
> Ids
> ---
> The Id space is not centrally managed, so whenever an application needs to
> add headers, or use an eco-system plugin that does, its Id allocation will
> need to be manually configured.
> This moves the allocation concern from the global space down to
> organization level and avoids the risk for id conflicts.
> Example pseudo-config for some app:
> sometrackerplugin.tag.sourcev3.id=1000
> dbthing.tag.tablename.id=1001
> myschemareg.tag.schemaname.id=1002
> myschemareg.tag.schemaversion.id=1003
>
>
> Each header-writing or header-reading plugin must provide means (typically
> through configuration) to specify the tag for each header it uses. Defaults
> should be avoided.
> A consumer silently ignores tags it does not have a mapping for (since the
> binary_data can't be parsed without knowing what it is).
>
> Id range 0..999 is reserved for future use by the broker and must not be
> used by plugins.
>
>
>
> Broker
> -
> The broker does not process the tags (other than the standard protocol
> syntax verification), it simply stores and forwards them as opaque data.
>
> Standard message translation (removal of Headers) kicks in for older
> clients.
>
>
> Why not string ids?
> -
> String ids might seem like a good idea, but:
>  * does not really solve uniqueness
>  * consumes a lot of space (2 byte string length + string, per header) to
> be meaningful
>  * doesn't really say anything how to parse the tag's data, so it is in
> effect useless on its own.
>
>
> Regards,
> Magnus
>
>
>
>
> 2016-11-07 18:32 GMT+01:00 Michael Pearce :
>
> > Hi Roger,
> >
> > Thanks for the support.
> >
> > I think the key thing is to have a common key space to make an ecosystem,
> > there does have to be some level of contract for people to play nicely.
> >
> > Having map or as per current proposed in kip of having a
> > numerical key space of  map is a level of the contract that
> > most people would expect.
> >
> > I think the example in a previous comment someone else made linking to
> AWS
> > blog and also implemented api where originally they didn’t have a header
> > space but not they do, where keys are uniform but the value can be
> string,
> > int, anything is a good example.
> >
> > Having a custom MetadataSerializer is something we had played with, but
> > discounted the idea, as if you wanted everyone to work the same way in
> the
> > ecosystem, having to have this also customizable makes it a bit harder.
> > Think about making the whole message record custom serializable, this
> would
> > make it fairly tricky (though it would not be impossible) to have made
> work
> > nicely. Having the value customizable we thought is a reasonable tradeoff
> > here of flexibility over contract of interaction between different
> parties.
> >
> > Is there a particular case or benefit of having serialization
> customizable
> > that you have in mind?
> >
> > Saying this it is obviously something that could be implemented, if there
> > is a need. If we did go this avenue I think a defaulted serializer
> > implementation should exist so for the 80:20 rule, people can just have
> the
> > broker and clients get default behavior.
> >
> > Cheers
> > Mike
> >
> > On 11/6/16, 5:25 PM, "radai"  wrote:
> >
> > making header _key_ serialization configurable potentially undermines
> > the
> > board usefulness of the feature (any point along the path must be
> able
> > to
> > read the header keys. the values may be whatever and require more
> > intimate
> > knowledge of the code that produced specific headers, but keys should
> > be
> > universally readable).
> >
> > it would also make it hard to write really portable plugins - say i
> > wrote a
> > large message splitter/combiner - if i rely on key "largeMessage" and
> > values of the form "1/20" someone who uses (contrived example)
> > Map > Double> wouldnt be able to re-use my code.

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Roger Hoover
Please see comments inline.

On Mon, Nov 7, 2016 at 9:32 AM, Michael Pearce 
wrote:

> Hi Roger,
>
> Thanks for the support.
>

Thanks for leading the discussion on this.  It's an important topic.


>
> I think the key thing is to have a common key space to make an ecosystem,
> there does have to be some level of contract for people to play nicely.
>


Agreed.  There doesn't yet seem to be agreement on whether the broker needs
to understand the metadata structure or whether it's a client-level
concept.  We could define a common spec on top of the existing Kafka
protocol and require clients to implement it if they want to be
metadata-compliant.  That would have the advantage of keeping the broker
and core protocol simpler (would require no changes).  The reason I'm in
favor of making the broker aware of metadata is that it would allow a
smooth migration as clients begin using the new metadata structure.
Serializing metadata to byte[] in the protocol makes sense to me because I
don't see any reason that the broker needs to spend CPU time parsing and
validating individual headers that it doesn't care about.  Nor should the
base wire protocol and on-disk format need to commit to a particular header
structure (when it's only needed at a higher level).

I'm not necessarily opposed to defining a key-value structure in core Kafka
but don't see a strong reason to do it there when it could be done at the
client layer (while still enabling a common metadata model across the
ecosystem).  Without a strong reason, it makes sense to keep things simpler
and more efficient for the brokers (byte arrays for keys, metadata, and
values).


> Having map or as per current proposed in kip of having a
> numerical key space of  map is a level of the contract that
> most people would expect.
>

Yes, this seems good to me too.  I'm in favor of prescribing something like
this in client APIs.


>
> I think the example in a previous comment someone else made linking to AWS
> blog and also implemented api where originally they didn’t have a header
> space but not they do, where keys are uniform but the value can be string,
> int, anything is a good example.
>
> Having a custom MetadataSerializer is something we had played with, but
> discounted the idea, as if you wanted everyone to work the same way in the
> ecosystem, having to have this also customizable makes it a bit harder.
> Think about making the whole message record custom serializable, this would
> make it fairly tricky (though it would not be impossible) to have made work
> nicely. Having the value customizable we thought is a reasonable tradeoff
> here of flexibility over contract of interaction between different parties.
>
> Is there a particular case or benefit of having serialization customizable
> that you have in mind?
>

I guess this depends on whether we decide to encode individual headers in
the protocol or not.  If so, then custom serialization does not make
sense.  If metadata is a byte array, then it does.  The main reason for
allowing custom metadata serialization is that there already exist a lot of
good serialization solutions (Protobuf, Avro, HPACK, etc.). Kafka clients
could ship with a default header serializer and perhaps 80% of users will
just use it (as you said).  As long as the client API is key-value,
everything should interoperate as expected with custom serialization
without any application changes if you configure your custom metadata serde
in all your clients (brokers wouldn't care).



>
> Saying this it is obviously something that could be implemented, if there
> is a need. If we did go this avenue I think a defaulted serializer
> implementation should exist so for the 80:20 rule, people can just have the
> broker and clients get default behavior.
>
> Cheers
> Mike
>
> On 11/6/16, 5:25 PM, "radai"  wrote:
>
> making header _key_ serialization configurable potentially undermines
> the
> board usefulness of the feature (any point along the path must be able
> to
> read the header keys. the values may be whatever and require more
> intimate
> knowledge of the code that produced specific headers, but keys should
> be
> universally readable).
>
> it would also make it hard to write really portable plugins - say i
> wrote a
> large message splitter/combiner - if i rely on key "largeMessage" and
> values of the form "1/20" someone who uses (contrived example)
> Map Double> wouldnt be able to re-use my code.
>
> not the end of a the world within an organization, but problematic if
> you
> want to enable an ecosystem
>
> On Thu, Nov 3, 2016 at 2:04 PM, Roger Hoover 
> wrote:
>
> >  As others have laid out, I see strong reasons for a common message
> > metadata structure for the Kafka ecosystem.  In particular, I've
> seen that
> > even within a single organization, infrastructure 

Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-07 Thread Gwen Shapira
Hey Radai,

Looking at the proposal, it looks like a major question is still unresolved?
"This configuration parameter can either replace queued.max.requests
completely, or co-exist with it (by way of either-or or respecting
both bounds and not picking up new requests when either is hit)."

On Mon, Nov 7, 2016 at 1:08 PM, radai  wrote:
> Hi,
>
> I would like to initiate a vote on KIP-72:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
>
> The kip allows specifying a limit on the amount of memory allocated for
> reading incoming requests into. This is useful for "sizing" a broker and
> avoiding OOMEs under heavy load (as actually happens occasionally at
> linkedin).
>
> I believe I've addressed most (all?) concerns brought up during the
> discussion.
>
> To the best of my understanding this vote is about the goal and
> public-facing changes related to the new proposed behavior, but as for
> implementation, i have the code up here:
>
> https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting
>
> and I've stress-tested it to work properly (meaning it chugs along and
> throttles under loads that would DOS 10.0.1.0 code).
>
> I also believe that the primitives and "pattern"s introduced in this KIP
> (namely the notion of a buffer pool and retrieving from / releasing to said
> pool instead of allocating memory) are generally useful beyond the scope of
> this KIP for both performance issues (allocating lots of short-lived large
> buffers is a performance bottleneck) and other areas where memory limits
> are a problem (KIP-81)
>
> Thank you,
>
> Radai.



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


[jira] [Assigned] (KAFKA-3910) Cyclic schema support in ConnectSchema and SchemaBuilder

2016-11-07 Thread Shikhar Bhushan (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shikhar Bhushan reassigned KAFKA-3910:
--

Assignee: Shikhar Bhushan  (was: Ewen Cheslack-Postava)

> Cyclic schema support in ConnectSchema and SchemaBuilder
> 
>
> Key: KAFKA-3910
> URL: https://issues.apache.org/jira/browse/KAFKA-3910
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: John Hofman
>Assignee: Shikhar Bhushan
>
> Cyclic schema's are not supported by ConnectSchema or SchemaBuilder. 
> Subsequently the AvroConverter (confluentinc/schema-registry) hits a stack 
> overflow when converting a cyclic avro schema, e.g:
> {code}
> {"type":"record", 
> "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}
> {code}
> This is a blocking issue for all connectors running on the connect framework 
> with data containing cyclic references. The AvroConverter cannot support 
> cyclic schema's until the underlying ConnectSchema and SchemaBuilder do.
> To reproduce the stack-overflow (Confluent-3.0.0):
> Produce some cyclic data:
> {code}
> bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test 
> --property value.schema='{"type":"record", 
> "name":"list","fields":[{"name":"value","type":"int"},{"name":"next","type":["null","list"]}]}'
> {"value":1,"next":null} 
> {"value":1,"next":{"list":{"value":2,"next":null}}}
> {code}
> Then try to consume it with connect:
> {code:title=connect-console-sink.properties}
> name=local-console-sink 
> connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector 
> tasks.max=1 
> topics=test
> {code}
> {code}
> ./bin/connect-standalone 
> ./etc/schema-registry/connect-avro-standalone.properties 
> connect-console-sink.properties  
> … start up logging … 
> java.lang.StackOverflowError 
>  at org.apache.avro.JsonProperties.getJsonProp(JsonProperties.java:54) 
>  at org.apache.avro.JsonProperties.getProp(JsonProperties.java:45) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1055) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1103) 
>  at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1137)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Magnus Edenhill
Hi,

I'm +1 for adding generic message headers, but I do share the concerns
previously aired on this thread and during the KIP meeting.

So let me propose a slimmer alternative that does not require any sort of
global header registry, does not affect broker performance or operations,
and adds as little overhead as possible.


Message

The protocol Message type is extended with a Headers array consting of
Tags, where a Tag is defined as:
   int16 Id
   int16 Len  // binary_data length
   binary_data[Len]  // opaque binary data


Ids
---
The Id space is not centrally managed, so whenever an application needs to
add headers, or use an eco-system plugin that does, its Id allocation will
need to be manually configured.
This moves the allocation concern from the global space down to
organization level and avoids the risk for id conflicts.
Example pseudo-config for some app:
sometrackerplugin.tag.sourcev3.id=1000
dbthing.tag.tablename.id=1001
myschemareg.tag.schemaname.id=1002
myschemareg.tag.schemaversion.id=1003


Each header-writing or header-reading plugin must provide means (typically
through configuration) to specify the tag for each header it uses. Defaults
should be avoided.
A consumer silently ignores tags it does not have a mapping for (since the
binary_data can't be parsed without knowing what it is).

Id range 0..999 is reserved for future use by the broker and must not be
used by plugins.



Broker
-
The broker does not process the tags (other than the standard protocol
syntax verification), it simply stores and forwards them as opaque data.

Standard message translation (removal of Headers) kicks in for older
clients.


Why not string ids?
-
String ids might seem like a good idea, but:
 * does not really solve uniqueness
 * consumes a lot of space (2 byte string length + string, per header) to
be meaningful
 * doesn't really say anything how to parse the tag's data, so it is in
effect useless on its own.


Regards,
Magnus




2016-11-07 18:32 GMT+01:00 Michael Pearce :

> Hi Roger,
>
> Thanks for the support.
>
> I think the key thing is to have a common key space to make an ecosystem,
> there does have to be some level of contract for people to play nicely.
>
> Having map or as per current proposed in kip of having a
> numerical key space of  map is a level of the contract that
> most people would expect.
>
> I think the example in a previous comment someone else made linking to AWS
> blog and also implemented api where originally they didn’t have a header
> space but not they do, where keys are uniform but the value can be string,
> int, anything is a good example.
>
> Having a custom MetadataSerializer is something we had played with, but
> discounted the idea, as if you wanted everyone to work the same way in the
> ecosystem, having to have this also customizable makes it a bit harder.
> Think about making the whole message record custom serializable, this would
> make it fairly tricky (though it would not be impossible) to have made work
> nicely. Having the value customizable we thought is a reasonable tradeoff
> here of flexibility over contract of interaction between different parties.
>
> Is there a particular case or benefit of having serialization customizable
> that you have in mind?
>
> Saying this it is obviously something that could be implemented, if there
> is a need. If we did go this avenue I think a defaulted serializer
> implementation should exist so for the 80:20 rule, people can just have the
> broker and clients get default behavior.
>
> Cheers
> Mike
>
> On 11/6/16, 5:25 PM, "radai"  wrote:
>
> making header _key_ serialization configurable potentially undermines
> the
> board usefulness of the feature (any point along the path must be able
> to
> read the header keys. the values may be whatever and require more
> intimate
> knowledge of the code that produced specific headers, but keys should
> be
> universally readable).
>
> it would also make it hard to write really portable plugins - say i
> wrote a
> large message splitter/combiner - if i rely on key "largeMessage" and
> values of the form "1/20" someone who uses (contrived example)
> Map Double> wouldnt be able to re-use my code.
>
> not the end of a the world within an organization, but problematic if
> you
> want to enable an ecosystem
>
> On Thu, Nov 3, 2016 at 2:04 PM, Roger Hoover 
> wrote:
>
> >  As others have laid out, I see strong reasons for a common message
> > metadata structure for the Kafka ecosystem.  In particular, I've
> seen that
> > even within a single organization, infrastructure teams often own the
> > message metadata while application teams own the application-level
> data
> > format.  Allowing metadata and content to have different structure
> 

[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645583#comment-15645583
 ] 

Joel Koshy commented on KAFKA-4362:
---

In this specific issue, the coordinator is available, but has moved to another 
broker. The client isn't informed of this movement though since it gets an 
unknown error (-1). With the Java client such errors should be automatically 
handled - i.e., rediscover the coordinator.

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-07 Thread radai
Hi,

I would like to initiate a vote on KIP-72:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests

The kip allows specifying a limit on the amount of memory allocated for
reading incoming requests into. This is useful for "sizing" a broker and
avoiding OOMEs under heavy load (as actually happens occasionally at
linkedin).

I believe I've addressed most (all?) concerns brought up during the
discussion.

To the best of my understanding this vote is about the goal and
public-facing changes related to the new proposed behavior, but as for
implementation, i have the code up here:

https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting

and I've stress-tested it to work properly (meaning it chugs along and
throttles under loads that would DOS 10.0.1.0 code).

I also believe that the primitives and "pattern"s introduced in this KIP
(namely the notion of a buffer pool and retrieving from / releasing to said
pool instead of allocating memory) are generally useful beyond the scope of
this KIP for both performance issues (allocating lots of short-lived large
buffers is a performance bottleneck) and other areas where memory limits
are a problem (KIP-81)

Thank you,

Radai.


[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645427#comment-15645427
 ] 

Joel Koshy commented on KAFKA-4362:
---

Sorry - missed this comment. It does not recover because the exception is 
thrown before the point the broker determines that it is no longer the 
coordinator.

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4381) Add per partition lag metric to KafkaConsumer.

2016-11-07 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645419#comment-15645419
 ] 

Joel Koshy commented on KAFKA-4381:
---

This is up to you but I definitely agree with Jason that it's overkill to have 
KIPs for this level of improvement. Configs are similar as well unless they are 
very nuanced such as timeouts. The PR itself should be sufficient to serve as a 
forum to discuss any concerns.

Metric name changes are different since presumably people are already 
monitoring those metrics - such changes could deserve a KIP or even just a 
email heads-up.

> Add per partition lag metric to KafkaConsumer.
> --
>
> Key: KAFKA-4381
> URL: https://issues.apache.org/jira/browse/KAFKA-4381
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.2.0
>
>
> Currently KafkaConsumer only has a metric of max lag across all the 
> partitions. It would be useful to know per partition lag as well.
> I remember there was a ticket created before but did not find it. So I am 
> creating this ticket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #1025

2016-11-07 Thread Apache Jenkins Server
See 

Changes:

[jason] MINOR: Fix regex on connector path param in ConnectorsResource

--
[...truncated 14184 lines...]

org.apache.kafka.streams.kstream.JoinWindowsTest > validWindows PASSED

org.apache.kafka.streams.kstream.JoinWindowsTest > 
timeDifferenceMustNotBeNegative STARTED

org.apache.kafka.streams.kstream.JoinWindowsTest > 
timeDifferenceMustNotBeNegative PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldHaveCorrectSourceTopicsForTableFromMergedStream STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldHaveCorrectSourceTopicsForTableFromMergedStream PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldHaveCorrectSourceTopicsForTableFromMergedStreamWithProcessors STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldHaveCorrectSourceTopicsForTableFromMergedStreamWithProcessors PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testMerge STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testMerge PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testFrom STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testFrom PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldThrowExceptionWhenTopicNamesAreNull STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldThrowExceptionWhenTopicNamesAreNull PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldThrowExceptionWhenNoTopicPresent STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > 
shouldThrowExceptionWhenNoTopicPresent PASSED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testNewName STARTED

org.apache.kafka.streams.kstream.KStreamBuilderTest > testNewName PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
shouldHaveSaneEqualsAndHashCode STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
shouldHaveSaneEqualsAndHashCode PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowSizeMustNotBeZero 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowSizeMustNotBeZero 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > advanceIntervalMustNotBeZero 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > advanceIntervalMustNotBeZero 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowSizeMustNotBeNegative 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowSizeMustNotBeNegative 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
advanceIntervalMustNotBeNegative STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
advanceIntervalMustNotBeNegative PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
advanceIntervalMustNotBeLargerThanWindowSize STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
advanceIntervalMustNotBeLargerThanWindowSize PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowsForTumblingWindows 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowsForTumblingWindows 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowsForHoppingWindows 
STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > windowsForHoppingWindows 
PASSED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
windowsForBarelyOverlappingHoppingWindows STARTED

org.apache.kafka.streams.kstream.TimeWindowsTest > 
windowsForBarelyOverlappingHoppingWindows PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > defaultSerdeShouldBeConfigured 
STARTED

org.apache.kafka.streams.StreamsConfigTest > defaultSerdeShouldBeConfigured 
PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldOverrideStreamsDefaultConsumerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > testGetProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowStreamsExceptionIfValueSerdeConfigFails PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfConsumerAutoCommitIsOverridden STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldThrowExceptionIfConsumerAutoCommitIsOverridden PASSED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs STARTED

org.apache.kafka.streams.StreamsConfigTest > 
shouldSupportPrefixedProducerConfigs PASSED

org.apache.kafka.streams.StreamsConfigTest > 

[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645243#comment-15645243
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---

Github user markcshelton closed the pull request at:

https://github.com/apache/kafka/pull/2105


> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4322) StateRestoreCallback begin and end indication

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645244#comment-15645244
 ] 

ASF GitHub Bot commented on KAFKA-4322:
---

GitHub user markcshelton reopened a pull request:

https://github.com/apache/kafka/pull/2105

KAFKA-4322 StateRestoreCallback begin and end indication

This adds a begin and end callback to StateRestoreCallback.

The contribution is my original work and I license the work to Apache Kafka 
under the Kafka's open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markcshelton/kafka KAFKA-4322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2105


commit 9f59ea6ff2aac94bdd84eaafda36c860b12e1dcd
Author: Mark Shelton 
Date:   2016-11-04T20:57:37Z

changes KAFKA-4322 StateRestoreCallback begin and end indication




> StateRestoreCallback begin and end indication
> -
>
> Key: KAFKA-4322
> URL: https://issues.apache.org/jira/browse/KAFKA-4322
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Mark Shelton
>Assignee: Guozhang Wang
>Priority: Minor
>
> In Kafka Streams, the StateRestoreCallback interface provides only a single 
> method "restore(byte[] key, byte[] value)" that is called for every key-value 
> pair to be restored. 
> It would be nice to have "beginRestore" and "endRestore" methods as part of 
> StateRestoreCallback.
> Kafka Streams would call "beginRestore" before restoring any keys, and would 
> call "endRestore" when it determines that it is done. This allows an 
> implementation, for example, to report on the number of keys restored and 
> perform a commit after the last key was restored. Other uses are conceivable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indi...

2016-11-07 Thread markcshelton
Github user markcshelton closed the pull request at:

https://github.com/apache/kafka/pull/2105


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2105: KAFKA-4322 StateRestoreCallback begin and end indi...

2016-11-07 Thread markcshelton
GitHub user markcshelton reopened a pull request:

https://github.com/apache/kafka/pull/2105

KAFKA-4322 StateRestoreCallback begin and end indication

This adds a begin and end callback to StateRestoreCallback.

The contribution is my original work and I license the work to Apache Kafka 
under the Kafka's open source license.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markcshelton/kafka KAFKA-4322

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2105.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2105


commit 9f59ea6ff2aac94bdd84eaafda36c860b12e1dcd
Author: Mark Shelton 
Date:   2016-11-04T20:57:37Z

changes KAFKA-4322 StateRestoreCallback begin and end indication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Jenkins build is back to normal : kafka-trunk-jdk7 #1675

2016-11-07 Thread Apache Jenkins Server
See 



[jira] [Updated] (KAFKA-4387) KafkaConsumer will enter an infinite loop if the polling thread is interrupted, and either commitSync or committed is called

2016-11-07 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated KAFKA-4387:
--
Status: Patch Available  (was: Open)

> KafkaConsumer will enter an infinite loop if the polling thread is 
> interrupted, and either commitSync or committed is called
> 
>
> Key: KAFKA-4387
> URL: https://issues.apache.org/jira/browse/KAFKA-4387
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>
> When the KafkaConsumer.commitSync method is called, the 
> ConsumerNetworkClient.poll(RequestFuture future) method will be called 
> with a future that only finishes when the commit request completes, or the 
> request times out.
> When the calling thread is interrupted, every call to the Selector underlying 
> the ConsumerNetworkClient will return immediately, while thread interrupt 
> state is not reset. The call to poll ends up looping until the request 
> timeout, at which point it drops back out to 
> ConsumerCoordinator.commitOffsetsSync which retries the request because 
> TimeoutException is retriable. This repeats indefinitely. 
> For the same reason as in https://issues.apache.org/jira/browse/KAFKA-4375, 
> it is good if the KafkaConsumer can handle interrupts in a reasonable way, 
> rather than having wakeup() be the only way to properly stop a consumer 
> thread.
> I think making ConsumerNetworkClient.maybeTriggerWakeup() throw a 
> WakeupException if the calling thread is interrupted makes sense, since an 
> interrupted thread won't be making progress in polling due to the way 
> Selector works, and KafkaConsumer users then don't have to handle wakeups and 
> interrupts separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4387) KafkaConsumer will enter an infinite loop if the polling thread is interrupted, and either commitSync or committed is called

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15645113#comment-15645113
 ] 

ASF GitHub Bot commented on KAFKA-4387:
---

GitHub user srdo opened a pull request:

https://github.com/apache/kafka/pull/2110

KAFKA-4387: Fix KafkaConsumer not responding correctly to interrupts,…

... throw InterruptException from blocking methods when interrupted

See https://issues.apache.org/jira/browse/KAFKA-4387

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/kafka KAFKA-4387

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2110


commit a3b8303bf33c24eb51130e1a4d1cd7d17771fecd
Author: Stig Rohde Døssing 
Date:   2016-11-07T19:09:22Z

KAFKA-4387: Fix KafkaConsumer not responding correctly to interrupts, throw 
InterruptException from blocking methods when interrupted




> KafkaConsumer will enter an infinite loop if the polling thread is 
> interrupted, and either commitSync or committed is called
> 
>
> Key: KAFKA-4387
> URL: https://issues.apache.org/jira/browse/KAFKA-4387
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>
> When the KafkaConsumer.commitSync method is called, the 
> ConsumerNetworkClient.poll(RequestFuture future) method will be called 
> with a future that only finishes when the commit request completes, or the 
> request times out.
> When the calling thread is interrupted, every call to the Selector underlying 
> the ConsumerNetworkClient will return immediately, while thread interrupt 
> state is not reset. The call to poll ends up looping until the request 
> timeout, at which point it drops back out to 
> ConsumerCoordinator.commitOffsetsSync which retries the request because 
> TimeoutException is retriable. This repeats indefinitely. 
> For the same reason as in https://issues.apache.org/jira/browse/KAFKA-4375, 
> it is good if the KafkaConsumer can handle interrupts in a reasonable way, 
> rather than having wakeup() be the only way to properly stop a consumer 
> thread.
> I think making ConsumerNetworkClient.maybeTriggerWakeup() throw a 
> WakeupException if the calling thread is interrupted makes sense, since an 
> interrupted thread won't be making progress in polling due to the way 
> Selector works, and KafkaConsumer users then don't have to handle wakeups and 
> interrupts separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #2110: KAFKA-4387: Fix KafkaConsumer not responding corre...

2016-11-07 Thread srdo
GitHub user srdo opened a pull request:

https://github.com/apache/kafka/pull/2110

KAFKA-4387: Fix KafkaConsumer not responding correctly to interrupts,…

... throw InterruptException from blocking methods when interrupted

See https://issues.apache.org/jira/browse/KAFKA-4387

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srdo/kafka KAFKA-4387

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2110


commit a3b8303bf33c24eb51130e1a4d1cd7d17771fecd
Author: Stig Rohde Døssing 
Date:   2016-11-07T19:09:22Z

KAFKA-4387: Fix KafkaConsumer not responding correctly to interrupts, throw 
InterruptException from blocking methods when interrupted




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-07 Thread Becket Qin
Hi Michael,

Yes, changing the logic in the log cleaner makes sense. There could be some
other thing worth thinking (e.g. the message size change after conversion),
though.

The scenario I was thinking is the following:
Imagine a distributed caching system built on top of Kafka. A user is
consuming from a topic and it is guaranteed that if the user consume to the
log end it will get the latest value for all the keys. Currently if the
consumer sees a null value it knows the key has been removed. Now let's say
we rolled out this change. And the producer applies a message with the
tombstone flag set, but the value was not null. When we append that message
to the log I suppose we will not do the down conversion if the broker has
set the message.format.version to the latest. Because the log cleaner won't
touch the active log segment, so that message will be sitting in the active
segment as is. Now when a consumer that hasn't upgraded yet consumes that
tombstone message in the active segment, it seems that the broker will need
to down convert that message to remove the value, right? In this case, we
cannot wait for the log cleaner to do the down conversion because that
message may have already been consumed before the log compaction happens.

Thanks,

Jiangjie (Becket) Qin



On Mon, Nov 7, 2016 at 9:59 AM, Michael Pearce 
wrote:

> Hi Becket,
>
> We were thinking more about having the logic that’s in the method
> shouldRetainMessage configurable via http://kafka.apache.org/
> documentation.html#brokerconfigs  at a broker/topic level. And then scrap
> auto converting the message, and allow organisations to manage the rollout
> of enabling of the feature.
> (this isn’t in documentation but in response to the discussion thread as
> an alternative approach to roll out the feature)
>
> Does this make any more sense?
>
> Thanks
> Mike
>
> On 11/3/16, 2:27 PM, "Becket Qin"  wrote:
>
> Hi Michael,
>
> Do you mean using a new configuration it is just the exiting
> message.format.version config? It seems the message.format.version
> config
> is enough in this case. And the default value would always be the
> latest
> version.
>
> > Message version migration would be handled as like in KIP-32
>
> Also just want to confirm on this. Today if an old consumer consumes a
> log
> compacted topic and sees an empty value, it knows that is a tombstone.
> After we start to use the attribute bit, a tombstone message can have a
> non-empty value. So by "like in KIP-32" you mean we will remove the
> value
> to down convert the message if the consumer version is old, right?
>
> Thanks.
>
> Jiangjie (Becket) Qin
>
> On Wed, Nov 2, 2016 at 1:37 AM, Michael Pearce 
> wrote:
>
> > Hi Joel , et al.
> >
> > Any comments on the below idea to handle roll out / compatibility of
> this
> > feature, using a configuration?
> >
> > Does it make sense/clear?
> > Does it add value?
> > Do we want to enforce flag by default, or value by default, or both?
> >
> > Cheers
> > Mike
> >
> >
> > On 10/27/16, 4:47 PM, "Michael Pearce" 
> wrote:
> >
> > Thanks, James, I think this is a really good addition to the KIP
> > details, please feel free to amend the wiki/add the use cases, also
> if any
> > others you think of. I definitely think its worthwhile documenting.
> If you
> > can’t let me know ill add them next week (just leaving for a long
> weekend
> > off)
> >
> > Re Joel and others comments about upgrade and compatibility.
> >
> > Rather than trying to auto manage this.
> >
> > Actually maybe we make a configuration option, both at server
> and per
> > topic level to control the behavior of how the server logic should
> work out
> > if the record, is a tombstone record .
> >
> > e.g.
> >
> > key = compation.tombstone.marker
> >
> > value options:
> >
> > value   (continues to use null value as tombstone marker)
> > flag (expects to use the tombstone flag)
> > value_or_flag (if either is true it treats the record as a
> tombstone)
> >
> > This way on upgrade users can keep current behavior, and slowly
> > migrate to the new. Having a transition period of using
> value_or_flag,
> > finally having flag only if an organization wishes to use null values
> > without it being treated as a tombstone marker (use case noted below)
> >
> > Having it both global broker level and topic override also
> allows some
> > flexibility here.
> >
> > Cheers
> > Mike
> >
> >
> >
> >
> >
> >
> > On 10/27/16, 8:03 AM, "James Cheng" 
> wrote:
> >
> > This KIP would definitely address a gap in the current
> 

[jira] [Commented] (KAFKA-4381) Add per partition lag metric to KafkaConsumer.

2016-11-07 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644978#comment-15644978
 ] 

Jiangjie Qin commented on KAFKA-4381:
-

Thanks Jason. I was not sure either. That said, we used to suffer from the 
sensor mbean name change. So maybe treating sensors as public APIs would be 
safer. I'll create a KIP this week.

> Add per partition lag metric to KafkaConsumer.
> --
>
> Key: KAFKA-4381
> URL: https://issues.apache.org/jira/browse/KAFKA-4381
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.2.0
>
>
> Currently KafkaConsumer only has a metric of max lag across all the 
> partitions. It would be useful to know per partition lag as well.
> I remember there was a ticket created before but did not find it. So I am 
> creating this ticket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KAFKA-4345: Run decktape test for each pull request

2016-11-07 Thread Raghav Kumar Gautam
Hi Ewen,

Thanks for the feedback. Answers are inlined.

On Sun, Nov 6, 2016 at 8:46 PM, Ewen Cheslack-Postava 
wrote:

> Yeah, I'm all for getting these to run more frequently and on lighter
> weight infrastructure. (By the way, I also saw the use of docker; I'd
> really like to get a "native" docker cluster type into ducktape at some
> point so all you have to do is bake the image and then spawn containers on
> demand.)
>
I completely agree, supporting docker integration in ducktape would be the
ideal solution of the problem.


>
> A few things. First, it'd be nice to know if we can chain these with normal
> PR builds or something like that. Even starting the system tests when we
> don't know the unit tests will pass seems like it'd be wasteful.
>
If we do chaining one problem that it will bring is that the turn around
time will suffer. It would take 1.5 hrs to run unit tests then another 1.5
hrs to run decktape tests. Also, don't dev run relevant unit tests before
they submit a patch ?

>
> Second, agreed on getting things stable before turning this on across the
> board.

I have done some work for stabilizing the tests. But I need help from kafka
community to take this further. It will be great if someone can guide me on
how to do this ? Should we start with a subset of tests that are stable and
enable others as we make progress ? Who are the people that can I work with
on this problem ?


> Confluent runs these tests nightly on full VMs in AWS and
> historically, besides buggy logic in tests, underprovisioned resources tend
> to be the biggest source of flakiness in tests.
>
 Good to know that I am not the only one worrying about this problem :-)

Finally, should we be checking w/ infra and/or Travis folks before enabling
> something this expensive? Are the Storm integration tests of comparable
> cost? There are some in-flight patches for parallelizing test runs of
> ducktape tests (which also results in better utilization). But even with
> those changes, the full test run is still quite a few VM-hours per PR and
> we only expect it to increase.
>
We can ask infra people about this. But I think this will not be a problem.
For e.g. Flink  is
using 11 hrs of computation time for each run. For kafka we are going to
start with 6hrs. Also, with the docker setup we can bring up the whole 12
node cluster on the laptop and run ducktape tests against it. So, test
development cycles will become faster.

With Regards,
Raghav.



>
> -Ewen
>
> On Thu, Nov 3, 2016 at 11:26 AM, Becket Qin  wrote:
>
> > Thanks for the explanation, Raghav.
> >
> > If the workload is not a concern then it is probably fine to run tests
> for
> > each PR update, although it may not be necessary :)
> >
> > On Thu, Nov 3, 2016 at 10:40 AM, Raghav Kumar Gautam 
> > wrote:
> >
> > > Hi Becket,
> > >
> > > The tests would be run each time a PR is created/updated this will look
> > > similar to https://github.com/apache/storm/pulls. Ducktape tests take
> > > about
> > > 7-8 hours to run on my laptop. For travis-ci we can split them in
> groups
> > > and run them in parallel. This was done in the POC run which took 1.5
> > hrs.
> > > It had 10 splits with 5 jobs running in parallel.
> > > https://travis-ci.org/raghavgautam/kafka/builds/171502069
> > > For apache projects the limit is 30 jobs in parallel and across all
> > > projects, so I expect it to take less time but it also depends on the
> > > workload at the time.
> > > https://blogs.apache.org/infra/entry/apache_gains_additional_travis_ci
> > >
> > > Thanks,
> > > Raghav.
> > >
> > > On Thu, Nov 3, 2016 at 9:41 AM, Becket Qin 
> wrote:
> > >
> > > > Thanks Raghav,
> > > >
> > > > +1 for the idea in general.
> > > >
> > > > One thing I am wondering is when the tests would be run? Would it be
> > run
> > > > when we merge a PR or it would be run every time a PR is
> > created/updated?
> > > > I am not sure how long do the tests in other projects take. For Kafka
> > it
> > > > may take a few hours to run all the ducktape tests, will that be an
> > issue
> > > > if we run the tests for each updates of the PR?
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Thu, Nov 3, 2016 at 8:16 AM, Harsha Chintalapani  >
> > > > wrote:
> > > >
> > > > > Thanks, Raghav . I am +1 for having this in Kafka. It will help
> > > identify
> > > > > any potential issues, especially with big patches. Given that we've
> > > some
> > > > > tests failing due to timing issues
> > > > > can we disable the failing tests for now so that we don't get any
> > false
> > > > > negatives?
> > > > >
> > > > > Thanks,
> > > > > Harsha
> > > > >
> > > > > On Tue, Nov 1, 2016 at 11:47 AM Raghav Kumar Gautam <
> > rag...@apache.org
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I want to start a discussion about running 

[jira] [Created] (KAFKA-4388) Connect key and value converters are listed without default values

2016-11-07 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-4388:


 Summary: Connect key and value converters are listed without 
default values
 Key: KAFKA-4388
 URL: https://issues.apache.org/jira/browse/KAFKA-4388
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 0.10.1.0
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Minor


KIP-75 added per connector converters. This exposes the settings on a 
per-connector basis via the validation API. However, the way this is specified 
for each connector is via a config value with no default value. This means the 
validation API implies there is no setting unless you provide one.

It would be much better to include the default value extracted from the 
WorkerConfig instead so it's clear you shouldn't need to override the default.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3994) Deadlock between consumer heartbeat expiration and offset commit.

2016-11-07 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-3994:

Priority: Critical  (was: Major)

> Deadlock between consumer heartbeat expiration and offset commit.
> -
>
> Key: KAFKA-3994
> URL: https://issues.apache.org/jira/browse/KAFKA-3994
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0
>Reporter: Jiangjie Qin
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 0.10.1.1
>
>
> I got the following stacktraces from ConsumerBounceTest
> {code}
> ...
> "Test worker" #12 prio=5 os_prio=0 tid=0x7fbb28b7f000 nid=0x427c runnable 
> [0x7fbb06445000]
>java.lang.Thread.State: RUNNABLE
> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x0003d48bcbc0> (a sun.nio.ch.Util$2)
> - locked <0x0003d48bcbb0> (a 
> java.util.Collections$UnmodifiableSet)
> - locked <0x0003d48bbd28> (a sun.nio.ch.EPollSelectorImpl)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:454)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:277)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:411)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1086)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1054)
> at 
> kafka.api.ConsumerBounceTest.consumeWithBrokerFailures(ConsumerBounceTest.scala:103)
> at 
> kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures(ConsumerBounceTest.scala:70)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
> at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
> at 
> 

[jira] [Commented] (KAFKA-4362) Consumer can fail after reassignment of the offsets topic partition

2016-11-07 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644945#comment-15644945
 ] 

Mayuresh Gharat commented on KAFKA-4362:


[~jasong35] form the details that Joel has listed, I think there are 2 issues :
1) Offsets commit fail when the Offsets topic partition is moved. This happens 
because the old coordinator incorrectly returns an iilegalArgumentException 
when checking for the MessageVersion format, when its infact checking first if 
the replica is local. So the correct way here would be to return 
"NotCoordinatorForGroupException" from server side.
2) On client side, right not due to illegalArgumentException thrown by server 
which is bubbled as UnknownException, the consumer is not able to handle it 
correctly. 

I think once we return the correct (NotCoordinatorForGroupException) exception, 
the consumer should be able to handle it and proceed. 

> Consumer can fail after reassignment of the offsets topic partition
> ---
>
> Key: KAFKA-4362
> URL: https://issues.apache.org/jira/browse/KAFKA-4362
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0
>Reporter: Joel Koshy
>Assignee: Mayuresh Gharat
>
> When a consumer offsets topic partition reassignment completes, an offset 
> commit shows this:
> {code}
> java.lang.IllegalArgumentException: Message format version for partition 100 
> not found
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager$$anonfun$14.apply(GroupMetadataManager.scala:633)
>  ~[kafka_2.10.jar:?]
> at scala.Option.getOrElse(Option.scala:120) ~[scala-library-2.10.4.jar:?]
> at 
> kafka.coordinator.GroupMetadataManager.kafka$coordinator$GroupMetadataManager$$getMessageFormatVersionAndTimestamp(GroupMetadataManager.scala:632)
>  ~[kafka_2.10.jar:?]
> at 
> ...
> {code}
> The issue is that the replica has been deleted so the 
> {{GroupMetadataManager.getMessageFormatVersionAndTimestamp}} throws this 
> exception instead which propagates as an unknown error.
> Unfortunately consumers don't respond to this and will fail their offset 
> commits.
> One workaround in the above situation is to bounce the cluster - the consumer 
> will be forced to rediscover the group coordinator.
> (Incidentally, the message incorrectly prints the number of partitions 
> instead of the actual partition.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4387) KafkaConsumer will enter an infinite loop if the polling thread is interrupted, and either commitSync or committed is called

2016-11-07 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644936#comment-15644936
 ] 

Stig Rohde Døssing commented on KAFKA-4387:
---

Fine by me. I'll make a PR for it shortly

> KafkaConsumer will enter an infinite loop if the polling thread is 
> interrupted, and either commitSync or committed is called
> 
>
> Key: KAFKA-4387
> URL: https://issues.apache.org/jira/browse/KAFKA-4387
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>
> When the KafkaConsumer.commitSync method is called, the 
> ConsumerNetworkClient.poll(RequestFuture future) method will be called 
> with a future that only finishes when the commit request completes, or the 
> request times out.
> When the calling thread is interrupted, every call to the Selector underlying 
> the ConsumerNetworkClient will return immediately, while thread interrupt 
> state is not reset. The call to poll ends up looping until the request 
> timeout, at which point it drops back out to 
> ConsumerCoordinator.commitOffsetsSync which retries the request because 
> TimeoutException is retriable. This repeats indefinitely. 
> For the same reason as in https://issues.apache.org/jira/browse/KAFKA-4375, 
> it is good if the KafkaConsumer can handle interrupts in a reasonable way, 
> rather than having wakeup() be the only way to properly stop a consumer 
> thread.
> I think making ConsumerNetworkClient.maybeTriggerWakeup() throw a 
> WakeupException if the calling thread is interrupted makes sense, since an 
> interrupted thread won't be making progress in polling due to the way 
> Selector works, and KafkaConsumer users then don't have to handle wakeups and 
> interrupts separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4387) KafkaConsumer will enter an infinite loop if the polling thread is interrupted, and either commitSync or committed is called

2016-11-07 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4387?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644913#comment-15644913
 ] 

Jason Gustafson commented on KAFKA-4387:


I think it's reasonable to have KafkaConsumer handle interrupts, but it seems 
confusing to mix the behavior with wakeup(). It's not that much effort to 
handle both of them (especially with Java 7 multiple exception catching) and 
some users may really want to treat them differently. We already have 
{{org.apache.kafka.common.errors.InterruptException}} which is raised from 
KafkaProducer, so maybe we could use it instead?

> KafkaConsumer will enter an infinite loop if the polling thread is 
> interrupted, and either commitSync or committed is called
> 
>
> Key: KAFKA-4387
> URL: https://issues.apache.org/jira/browse/KAFKA-4387
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.0.1
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>
> When the KafkaConsumer.commitSync method is called, the 
> ConsumerNetworkClient.poll(RequestFuture future) method will be called 
> with a future that only finishes when the commit request completes, or the 
> request times out.
> When the calling thread is interrupted, every call to the Selector underlying 
> the ConsumerNetworkClient will return immediately, while thread interrupt 
> state is not reset. The call to poll ends up looping until the request 
> timeout, at which point it drops back out to 
> ConsumerCoordinator.commitOffsetsSync which retries the request because 
> TimeoutException is retriable. This repeats indefinitely. 
> For the same reason as in https://issues.apache.org/jira/browse/KAFKA-4375, 
> it is good if the KafkaConsumer can handle interrupts in a reasonable way, 
> rather than having wakeup() be the only way to properly stop a consumer 
> thread.
> I think making ConsumerNetworkClient.maybeTriggerWakeup() throw a 
> WakeupException if the calling thread is interrupted makes sense, since an 
> interrupted thread won't be making progress in polling due to the way 
> Selector works, and KafkaConsumer users then don't have to handle wakeups and 
> interrupts separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-11-07 Thread Michael Pearce
Hi Becket,

We were thinking more about having the logic that’s in the method 
shouldRetainMessage configurable via 
http://kafka.apache.org/documentation.html#brokerconfigs  at a broker/topic 
level. And then scrap auto converting the message, and allow organisations to 
manage the rollout of enabling of the feature.
(this isn’t in documentation but in response to the discussion thread as an 
alternative approach to roll out the feature)

Does this make any more sense?

Thanks
Mike

On 11/3/16, 2:27 PM, "Becket Qin"  wrote:

Hi Michael,

Do you mean using a new configuration it is just the exiting
message.format.version config? It seems the message.format.version config
is enough in this case. And the default value would always be the latest
version.

> Message version migration would be handled as like in KIP-32

Also just want to confirm on this. Today if an old consumer consumes a log
compacted topic and sees an empty value, it knows that is a tombstone.
After we start to use the attribute bit, a tombstone message can have a
non-empty value. So by "like in KIP-32" you mean we will remove the value
to down convert the message if the consumer version is old, right?

Thanks.

Jiangjie (Becket) Qin

On Wed, Nov 2, 2016 at 1:37 AM, Michael Pearce 
wrote:

> Hi Joel , et al.
>
> Any comments on the below idea to handle roll out / compatibility of this
> feature, using a configuration?
>
> Does it make sense/clear?
> Does it add value?
> Do we want to enforce flag by default, or value by default, or both?
>
> Cheers
> Mike
>
>
> On 10/27/16, 4:47 PM, "Michael Pearce"  wrote:
>
> Thanks, James, I think this is a really good addition to the KIP
> details, please feel free to amend the wiki/add the use cases, also if any
> others you think of. I definitely think its worthwhile documenting. If you
> can’t let me know ill add them next week (just leaving for a long weekend
> off)
>
> Re Joel and others comments about upgrade and compatibility.
>
> Rather than trying to auto manage this.
>
> Actually maybe we make a configuration option, both at server and per
> topic level to control the behavior of how the server logic should work 
out
> if the record, is a tombstone record .
>
> e.g.
>
> key = compation.tombstone.marker
>
> value options:
>
> value   (continues to use null value as tombstone marker)
> flag (expects to use the tombstone flag)
> value_or_flag (if either is true it treats the record as a tombstone)
>
> This way on upgrade users can keep current behavior, and slowly
> migrate to the new. Having a transition period of using value_or_flag,
> finally having flag only if an organization wishes to use null values
> without it being treated as a tombstone marker (use case noted below)
>
> Having it both global broker level and topic override also allows some
> flexibility here.
>
> Cheers
> Mike
>
>
>
>
>
>
> On 10/27/16, 8:03 AM, "James Cheng"  wrote:
>
> This KIP would definitely address a gap in the current
> functionality, where you currently can't have a tombstone with any
> associated content.
>
> That said, I'd like to talk about use cases, to make sure that
> this is in fact useful. The KIP should be updated with whatever use cases
> we come up with.
>
> First of all, an observation: When we speak about log compaction,
> we typically think of "the latest message for a key is retained". In that
> respect, a delete tombstone (i.e. a message with a null payload) is 
treated
> the same as any other Kafka message: the latest message is retained. It
> doesn't matter whether the latest message is null, or if the latest 
message
> has actual content. In all cases, the last message is retained.
>
> The only way a delete tombstone is treated differently from other
> Kafka messages is that it automatically disappears after a while. The time
> of deletion is specified using delete.retention.ms.
>
> So what we're really talking about is, do we want to support
> messages in a log-compacted topic that auto-delete themselves after a 
while?
>
> In a thread from 2015, there was a discussion on first-class
> support of headers between Roger Hoover, Felix GV, Jun Rao, and I. See
> thread at https://groups.google.com/d/msg/confluent-platform/
> 8xPbjyUE_7E/yQ1AeCufL_gJ  msg/confluent-platform/8xPbjyUE_7E/yQ1AeCufL_gJ> . In that thread, Jun
> raised a good question that I didn't have a good answer 

[jira] [Commented] (KAFKA-4381) Add per partition lag metric to KafkaConsumer.

2016-11-07 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15644874#comment-15644874
 ] 

Jason Gustafson commented on KAFKA-4381:


I'm not sure. Have we done KIPs for individual metrics before? Seems like 
overkill to me, but it should go through quickly enough. It might also be good 
to encourage a more formal treatment of the metric APIs. I'd be ok either way I 
guess.

> Add per partition lag metric to KafkaConsumer.
> --
>
> Key: KAFKA-4381
> URL: https://issues.apache.org/jira/browse/KAFKA-4381
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.10.2.0
>
>
> Currently KafkaConsumer only has a metric of max lag across all the 
> partitions. It would be useful to know per partition lag as well.
> I remember there was a ticket created before but did not find it. So I am 
> creating this ticket.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4387) KafkaConsumer will enter an infinite loop if the polling thread is interrupted, and either commitSync or committed is called

2016-11-07 Thread JIRA
Stig Rohde Døssing created KAFKA-4387:
-

 Summary: KafkaConsumer will enter an infinite loop if the polling 
thread is interrupted, and either commitSync or committed is called
 Key: KAFKA-4387
 URL: https://issues.apache.org/jira/browse/KAFKA-4387
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.0.1
Reporter: Stig Rohde Døssing
Assignee: Stig Rohde Døssing


When the KafkaConsumer.commitSync method is called, the 
ConsumerNetworkClient.poll(RequestFuture future) method will be called with 
a future that only finishes when the commit request completes, or the request 
times out.

When the calling thread is interrupted, every call to the Selector underlying 
the ConsumerNetworkClient will return immediately, while thread interrupt state 
is not reset. The call to poll ends up looping until the request timeout, at 
which point it drops back out to ConsumerCoordinator.commitOffsetsSync which 
retries the request because TimeoutException is retriable. This repeats 
indefinitely. 

For the same reason as in https://issues.apache.org/jira/browse/KAFKA-4375, it 
is good if the KafkaConsumer can handle interrupts in a reasonable way, rather 
than having wakeup() be the only way to properly stop a consumer thread.

I think making ConsumerNetworkClient.maybeTriggerWakeup() throw a 
WakeupException if the calling thread is interrupted makes sense, since an 
interrupted thread won't be making progress in polling due to the way Selector 
works, and KafkaConsumer users then don't have to handle wakeups and interrupts 
separately.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-11-07 Thread Michael Pearce
Hi Roger,

Thanks for the support.

I think the key thing is to have a common key space to make an ecosystem, there 
does have to be some level of contract for people to play nicely.

Having map or as per current proposed in kip of having a 
numerical key space of  map is a level of the contract that most 
people would expect.

I think the example in a previous comment someone else made linking to AWS blog 
and also implemented api where originally they didn’t have a header space but 
not they do, where keys are uniform but the value can be string, int, anything 
is a good example.

Having a custom MetadataSerializer is something we had played with, but 
discounted the idea, as if you wanted everyone to work the same way in the 
ecosystem, having to have this also customizable makes it a bit harder. Think 
about making the whole message record custom serializable, this would make it 
fairly tricky (though it would not be impossible) to have made work nicely. 
Having the value customizable we thought is a reasonable tradeoff here of 
flexibility over contract of interaction between different parties.

Is there a particular case or benefit of having serialization customizable that 
you have in mind?

Saying this it is obviously something that could be implemented, if there is a 
need. If we did go this avenue I think a defaulted serializer implementation 
should exist so for the 80:20 rule, people can just have the broker and clients 
get default behavior.

Cheers
Mike

On 11/6/16, 5:25 PM, "radai"  wrote:

making header _key_ serialization configurable potentially undermines the
board usefulness of the feature (any point along the path must be able to
read the header keys. the values may be whatever and require more intimate
knowledge of the code that produced specific headers, but keys should be
universally readable).

it would also make it hard to write really portable plugins - say i wrote a
large message splitter/combiner - if i rely on key "largeMessage" and
values of the form "1/20" someone who uses (contrived example) Map wouldnt be able to re-use my code.

not the end of a the world within an organization, but problematic if you
want to enable an ecosystem

On Thu, Nov 3, 2016 at 2:04 PM, Roger Hoover  wrote:

>  As others have laid out, I see strong reasons for a common message
> metadata structure for the Kafka ecosystem.  In particular, I've seen that
> even within a single organization, infrastructure teams often own the
> message metadata while application teams own the application-level data
> format.  Allowing metadata and content to have different structure and
> evolve separately is very helpful for this.  Also, I think there's a lot 
of
> value to having a common metadata structure shared across the Kafka
> ecosystem so that tools which leverage metadata can more easily be shared
> across organizations and integrated together.
>
> The question is, where does the metadata structure belong?  Here's my 
take:
>
> We change the Kafka wire and on-disk format to from a (key, value) model 
to
> a (key, metadata, value) model where all three are byte arrays from the
> brokers point of view.  The primary reason for this is that it provides a
> backward compatible migration path forward.  Producers can start 
populating
> metadata fields before all consumers understand the metadata structure.
> For people who already have custom envelope structures, they can populate
> their existing structure and the new structure for a while as they make 
the
> transition.
>
> We could stop there and let the clients plug in a KeySerializer,
> MetadataSerializer, and ValueSerializer but I think it is also be useful 
to
> have a default MetadataSerializer that implements a key-value model 
similar
> to AMQP or HTTP headers.  Or we could go even further and prescribe a
> Map or Map data model for headers in the
> clients (while still allowing custom serialization of the header data
> model).
>
> I think this would address Radai's concerns:
> 1. All client code would not need to be updated to know about the
> container.
> 2. Middleware friendly clients would have a standard header data model to
> work with.
> 3. KIP is required both b/c of broker changes and because of client API
> changes.
>
> Cheers,
>
> Roger
>
>
> On Wed, Nov 2, 2016 at 4:38 PM, radai  wrote:
>
> > my biggest issues with a "standard" wrapper format:
> >
> > 1. _ALL_ client _CODE_ (as opposed to kafka lib version) must be updated
> to
> > know about the container, because any old naive code trying to directly
> > deserialize its own payload would 

Re: [DISCUSS] KIP 88: DescribeGroups Protocol Update

2016-11-07 Thread Jason Gustafson
Hey Vahid,

Thanks for the KIP. If I understand correctly, the problem is how to fetch
existing offsets for a group which has no active members, right? I'm not
totally clear why we need to modify the DescribeGroups API in order to
achieve this since we already have the OffsetFetch API. I think the
limitation currently is that you need to know the partitions to fetch
offsets for, but perhaps we could modify it to support the "null=all"
semantics that we used for the TopicMetadata API?

Thanks,
Jason

On Thu, Nov 3, 2016 at 11:09 AM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Hi all,
>
> I started a new KIP under
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 88%3A+DescribeGroups+Protocol+Update
> .
>
> The KIP is a proposal to update the DescribeGroups protocol to address
> KAFKA-3853 (https://issues.apache.org/jira/browse/KAFKA-3853).
>
> I appreciate your feedback.
>
> Thanks.
> --Vahid
>
>


[jira] [Updated] (KAFKA-4383) Update API design subsection to reflect the current implementation of Producer/Consumer

2016-11-07 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-4383:
-
Fix Version/s: (was: 0.10.0.1)
   (was: 0.10.0.0)

> Update API design subsection to reflect the current implementation of 
> Producer/Consumer
> ---
>
> Key: KAFKA-4383
> URL: https://issues.apache.org/jira/browse/KAFKA-4383
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 0.10.0.1
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>
> After 0.9.0 and 0.10.0 the site docs were updated to reflect transition to 
> the new APIs for producer and consumer. Changes were made in sections such as 
> {{2. APIS}} and {{3. CONFIGURATION}}. 
> However, the related subsections under {{5.IMPLEMENTATION}} still describe 
> the implementation details of the old producer and consumer APIs. This 
> section needs to be re-written to reflect the implementation status of the 
> new APIs (possibly by retaining the description for the old APIs as well in a 
> separate subsection). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4382) Fix broken fragments in site docs

2016-11-07 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-4382:
-
Fix Version/s: (was: 0.10.0.1)

> Fix broken fragments in site docs
> -
>
> Key: KAFKA-4382
> URL: https://issues.apache.org/jira/browse/KAFKA-4382
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.10.0.1
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Minor
>  Labels: documentation
>   Original Estimate: 6h
>  Remaining Estimate: 6h
>
> There are just a few broken fragments in the current version of site docs. 
> For instance, under documentation.html in 0.10.1 such fragments are: 
> {quote}
> http://kafka.apache.org/documentation.html#newconsumerapi
> http://kafka.apache.org/documentation#config_broker
> http://kafka.apache.org/documentation#security_kerberos_sasl_clientconfig
> {quote}
> A more thorough search in the previous versions of the documentation might 
> reveal a few more. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka-site issue #29: Update the website repo link in code.html to point to ...

2016-11-07 Thread becketqin
Github user becketqin commented on the issue:

https://github.com/apache/kafka-site/pull/29
  
@ijuma That's fair. I updated the page. The thing I found confusing is that 
we have documentation in three places: 1) Kafka code repo 2) kafka-site repo 3) 
some documentation are just a wiki page. I'll update 
https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Website+Documentation+Changes
 to clarify this a little.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #2108: MINOR: Fix regex on connector path param in Connec...

2016-11-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2108


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


replica fetch error and shuabing

2016-11-07 Thread Json Tu
Hi, when I move __consumer_offsets from old broker to new broker, we encounter 
error as follow and it always shuabing.
server.log.2016-11-07-19:[2016-11-07 19:17:15,392] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,476] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,573] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,640] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,697] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)
server.log.2016-11-07-19:[2016-11-07 19:17:15,770] ERROR Found invalid messages 
during fetch for partition [__consumer_offsets,10] offset 13973569 error 
Message found with corrupt size (0) in shallow iterator 
(kafka.server.ReplicaFetcherThread)

anyone can help solve it,thanks.


[GitHub] kafka pull request #2109: MINOR: fix incorrect logging in StreamThread

2016-11-07 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/2109

MINOR: fix incorrect logging in StreamThread

Fix incorrect logging when unable to create an active task. The output was: 
Failed to create an active task %s: 
It should have the taskId.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka minor-logging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2109


commit 2f531b1a137e982bb8103143d1bbd4d18dd40b01
Author: Damian Guy 
Date:   2016-11-07T15:16:54Z

fix incorrent logging in StreamThread




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


RE: Kafka Connect key.converter and value.converter properties for Avro encoding

2016-11-07 Thread david.franklin
Hi Ewen,

Sorry but I didn't understand much of that.

I currently have an implementation of the Converter interface that uses Avro's
BinaryEncoder/Decoder, SpecificDatumReader/Writer.

The main mismatch I faced is that I need to use org.apache.avro.Schema for 
serialization whereas the Converter interface requires a 
org.apache.kafka.connect.data.Schema schema.

In the absence of a transformer to interconvert between these Schema 
representations (are any available?) I have, for now, gone for the slightly 
fragile approach of inferring the schema from the topic name (we currently have 
a topic per event type).  This means I ignore the schema parameter in 
fromConnectData and return a null schema in toConnectData.

With this I can create a simple Kafka consumer that correctly reads these 
binary Avro encoded events generated by my Kafka Connect source, once I've set 
the Kafka value.deserializer property to my serializer class which implements 
Deserializer, which in turn (re)uses my Kafka Connect converter 
class internally.

However, I've noticed something odd: the fromConnectData  invocations come in 2 
forms:

1. schema = null, record = null
2. schema = Schema{BYTES}, record = a JSON structure

Schema{BYTES} is, I presume, because I specify Schema.BYTES_SCHEMA as the 4th 
arg to the SourceRecord ctr.

Any idea why form 1 occurs?

Thanks again,
David







-Original Message-
From: Ewen Cheslack-Postava [mailto:e...@confluent.io] 
Sent: 07 November 2016 04:35
To: dev@kafka.apache.org
Subject: Re: Kafka Connect key.converter and value.converter properties for 
Avro encoding

You won't be accepting/returning SpecificRecords directly when working with 
Connect's API. Connect intentionally uses an interface that is different from 
Kafka serializers because we deal with structured data that the connectors need 
to be able to understand. We define a generic runtime representation for data 
(under org.apache.kafka.connect.data) and Converters are responsible for taking 
the data all the way through any byte[] -> serialization-specific format (e.g. 
SpecificRecord) -> Connect Data API.

Even though your approach to handling Avro isn't exactly the same, I'd still 
suggest taking a look at our implementation. You'll be able to see how we 
separate this into those two steps, utilizing our normal Avro(De)Serializer to 
do byte[] <-> Avro conversions and then a separate class to do Avro <-> Connect 
Data API conversions. You could probably reuse the Avro <-> Connect Data API 
directly and only use the small bit of code you included for doing the byte[] 
<-> Avro conversion.

re: configure(), yes, it's safe for it to be a noop as long as your Converter 
really doesn't require *any* configuration. But I would guess it at least needs 
to know the SpecificRecord class or schema you are trying to (de)serialize.

-Ewen

On Thu, Nov 3, 2016 at 7:25 AM,  wrote:

> Thanks to Gwen and Tommy Baker for their helpful replies.
>
> Currently, the environment I need to work with doesn't use the Schema 
> Registry; hopefully one day it will but for now that's not an option.
> Events are written to Kafka without the schema embedded and each side 
> of the interface assumes a given schema, with the consequent risks accepted.
>
> To serialize a SpecificRecord for the 
> org.apache.kafka.connect.storage.Converter
> interface (in the absence of access to the Confluent implementation
> classes) I was thinking of something along these lines to Avro encode 
> a
> SpecificRecord:
>
> private byte[] toAvro(Schema schema, SpecificRecord record) throws 
> IOException{
> SpecificDatumWriter writer = new 
> SpecificDatumWriter<>(schema);
> ByteArrayOutputStream baos = new ByteArrayOutputStream();
> BinaryEncoder binaryEncoder = null;
> binaryEncoder = new EncoderFactory().binaryEncoder(baos,
> binaryEncoder);
> writer.write(record, binaryEncoder);
> return baos.toByteArray();
> }
>
> To work with Kafka Connect I need to comply with the 
> org.apache.kafka.connect.storage .Converter interface The Converter 
> interface defines the following methods:
>
> void configure(Map configs, boolean isKey); byte[] 
> fromConnectData(String topic, Schema schema, Object value); 
> SchemaAndValue toConnectData(String topic, byte[] value);
>
> Is it safe to provide a no-op implementation for configure().
>
> The toConnectData() method will presumably be achieved via a 
> corresponding SpecificDatumReader.
>
> Does this look a reasonable approach?
>
> Many thanks if you've read this far!
>
> Regards,
> David
>
>
> -Original Message-
> From: Gwen Shapira [mailto:g...@confluent.io]
> Sent: 02 November 2016 21:18
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect key.converter and value.converter 
> properties for Avro encoding
>
> Both the Confluent Avro Converter and the Confluent Avro Serializer 
> use the Schema Registry. The reason is, as Tommy Becker 

[jira] [Updated] (KAFKA-4384) ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-07 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4384:
---
Fix Version/s: 0.10.1.1

> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message
> 
>
> Key: KAFKA-4384
> URL: https://issues.apache.org/jira/browse/KAFKA-4384
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Ubuntu 12.04, AWS D2 instance
>Reporter: Jun He
> Fix For: 0.10.1.1
>
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where 
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message. As the same logic exists also in Kafka 0.10.0.0 and 0.10.0.1, they 
> may have the similar issue.
> Here are system logs related to this issue.
> 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.apply - Found invalid messages during fetch for 
> partition [logs,41] offset 39021512238 error Message is corrupt (stored crc = 
> 2028421553, computed crc = 3577227678)
> 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error due 
> to kafka.common.KafkaException: - error processing data for partition 
> [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: Offset 
> mismatch: fetched offset = 39021512301, log end offset = 39021512238.
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238) due 
> to some blip.
> Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L138
>  threw exception
> Then, Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L145
>  caught it and logged this error.
> Because 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L134
>  updated the topic partition offset to the fetched latest one in 
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted 
> messages. 
> Based on 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L84,
>  the ReplicaFetcherThread then directly fetched the next batch of messages 
> (with offset 39021512301)
> Next, ReplicaFetcherThread stopped because the log end offset (still 
> 39021512238) didn't match the fetched message (offset 39021512301).
> A quick fix is to move line 134 to be after line 138.
> Would be great to have your comments and please let me know if a Jira issue 
> is needed. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka-site issue #29: Update the website repo link in code.html to point to ...

2016-11-07 Thread ijuma
Github user ijuma commented on the issue:

https://github.com/apache/kafka-site/pull/29
  
@becketqin, it's just that we point to the Apache git repo (which is the 
source repository) a few lines above for the code repo. We should probably be 
consistent and mention GitHub for both if we think that's useful. And we should 
probably point to the Apache one first (since it's the source repository) and 
mention the GitHub mirror second.

Finally, contributing documentation changes is a little different and 
there's a separate section for it in: http://kafka.apache.org/contributing.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site issue #26: add trademark symbol on all pages plus longer footer m...

2016-11-07 Thread ijuma
Github user ijuma commented on the issue:

https://github.com/apache/kafka-site/pull/26
  
@derrickdoo, can you please close this PR since it has been merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [ANNOUNCE] New committer: Jiangjie (Becket) Qin

2016-11-07 Thread Michael Noll
Congratulations, Becket!

Best wishes,
Michael

On Thu, Nov 3, 2016 at 5:13 PM, Efe Gencer  wrote:

> Congratulations, Becket!
>
> Best,
> Efe
>
> 2016-11-03 11:22 GMT-04:00 Neha Narkhede :
>
> > Congratulations, Becket! Well done.
> >
> > On Wed, Nov 2, 2016 at 1:34 PM Eno Thereska 
> > wrote:
> >
> > > Congrats!
> > > Eno
> > >
> > > > On 1 Nov 2016, at 05:57, Harsha Chintalapani 
> wrote:
> > > >
> > > > Congrats Becket!
> > > > -Harsha
> > > >
> > > > On Mon, Oct 31, 2016 at 2:13 PM Rajini Sivaram <
> > > rajinisiva...@googlemail.com>
> > > > wrote:
> > > >
> > > >> Congratulations, Becket!
> > > >>
> > > >> On Mon, Oct 31, 2016 at 8:38 PM, Matthias J. Sax <
> > matth...@confluent.io
> > > >
> > > >> wrote:
> > > >>
> > > >>> -BEGIN PGP SIGNED MESSAGE-
> > > >>> Hash: SHA512
> > > >>>
> > > >>> Congrats!
> > > >>>
> > > >>> On 10/31/16 11:01 AM, Renu Tewari wrote:
> > >  Congratulations Becket!! Absolutely thrilled to hear this. Well
> > >  deserved!
> > > 
> > >  regards renu
> > > 
> > > 
> > >  On Mon, Oct 31, 2016 at 10:35 AM, Joel Koshy  >
> > >  wrote:
> > > 
> > > > The PMC for Apache Kafka has invited Jiangjie (Becket) Qin to
> > > > join as a committer and we are pleased to announce that he has
> > > > accepted!
> > > >
> > > > Becket has made significant contributions to Kafka over the last
> > > > two years. He has been deeply involved in a broad range of KIP
> > > > discussions and has contributed several major features to the
> > > > project. He recently completed the implementation of a series of
> > > > improvements (KIP-31, KIP-32, KIP-33) to Kafka’s message format
> > > > that address a number of long-standing issues such as avoiding
> > > > server-side re-compression, better accuracy for time-based log
> > > > retention, log roll and time-based indexing of messages.
> > > >
> > > > Congratulations Becket! Thank you for your many contributions. We
> > > > are excited to have you on board as a committer and look forward
> > > > to your continued participation!
> > > >
> > > > Joel
> > > >
> > > 
> > > >>> -BEGIN PGP SIGNATURE-
> > > >>> Comment: GPGTools - https://gpgtools.org
> > > >>>
> > > >>> iQIcBAEBCgAGBQJYF6uzAAoJECnhiMLycopPBuwP/1N2MtwWw7ms5gAfT/jvVCGi
> > > >>> mdNvdJprSwJHe3qwsc+glsvAqwS6OZfaVzK2qQcaxMX5KjQtwkkOKyErOl9hG7jD
> > > >>> Vw0aDcCbPuV2oEZ4m9K2J4Q3mZIfFrevicVb7oPGf4Yjt1sh9wxP08o7KHP2l5pN
> > > >>> 3mpIBEDp4rZ2pg/jXldyh57dW1btg3gZi1gNczWvXEAKf1ypXRPwPeDbvXADXDv3
> > > >>> 0NgmcXn242geoggnIbL30WgjH0bwHpVjLBr++YQ33FzRoHzASfAYHR/jSDKAytQe
> > > >>> a7Bkc69Bb1NSzkfhiJa+VW9V2DweO8kD+Xfz4dM02GQF0iJkAqare7a6zWedk/+U
> > > >>> hJRPz+tGlDSLePCYdyNj1ivJrFOmIQtyFOI3SBANfaneOmGJhPKtlNQQlNFKDbWS
> > > >>> CD1pBsc1iHNq6rXy21evc/aFk0Rrfs5d4rU9eG6jD8jc1mCbSwtzJI0vweX0r9Y/
> > > >>> 6Ao8cnsmDejYfap5lUMWeQfZOTkNRNpbkL7eoiVpe6wZw1nGL3T7GkrrWGRS3EQO
> > > >>> qp4Jjp+7yY4gIqsLfYouaHTEzAX7yN78QNUNCB4OqUiEL9+a8wTQ7dlTgXinEd8r
> > > >>> Kh9vTfpW7fb4c58aSpzntPUU4YFD3MHMam0iu5UrV9d5DrVTFDMJ83k15Z5DyTMt
> > > >>> 45nPYdjvJgFGWLYFnPwr
> > > >>> =VbpG
> > > >>> -END PGP SIGNATURE-
> > > >>>
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Regards,
> > > >>
> > > >> Rajini
> > > >>
> > >
> > > --
> > Thanks,
> > Neha
> >
>


[jira] [Updated] (KAFKA-4386) Producer Metrics Explanation

2016-11-07 Thread Pratik kumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4386?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pratik kumar updated KAFKA-4386:

Description: 
Context :
Kafka Producer 0.8.x

Problem:
Kafka Producer emits metrics regarding request size stats, request latency and 
request rate stats.
But the inherent meaning of the these metrics are not clear. What does this 
measure?
Is for each producer send request(which contains batches of messages per 
broker)? OR Is it for a batch of messages defined according to user batching 
policy? What happens when some application code has multiple async producers to 
increase performance (how are rate and percentiles measured?)?

  was:
Context :
Kafka Producer 0.8.x

Problem:
Kafka Producer emits metrics regarding request size stats, request latency and 
rate stats.
But the inherent meaning of the these metrics are not clear. What does this 
measure?
Is for each producer send request(which contains batches of messages per 
broker)? OR Is it for a batch of messages defined according to user batching 
policy? What happens when some application code has multiple async producers to 
increase performance (how are rate and percentiles measured?)?


> Producer Metrics Explanation
> 
>
> Key: KAFKA-4386
> URL: https://issues.apache.org/jira/browse/KAFKA-4386
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Pratik kumar
>  Labels: producer
>
> Context :
> Kafka Producer 0.8.x
> Problem:
> Kafka Producer emits metrics regarding request size stats, request latency 
> and request rate stats.
> But the inherent meaning of the these metrics are not clear. What does this 
> measure?
> Is for each producer send request(which contains batches of messages per 
> broker)? OR Is it for a batch of messages defined according to user batching 
> policy? What happens when some application code has multiple async producers 
> to increase performance (how are rate and percentiles measured?)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4386) Producer Metrics Explanation

2016-11-07 Thread Pratik kumar (JIRA)
Pratik kumar created KAFKA-4386:
---

 Summary: Producer Metrics Explanation
 Key: KAFKA-4386
 URL: https://issues.apache.org/jira/browse/KAFKA-4386
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Reporter: Pratik kumar


Context :
Kafka Producer 0.8.x

Problem:
Kafka Producer emits metrics regarding request size stats, request latency and 
rate stats.
But the inherent meaning of the these metrics are not clear. What does this 
measure?
Is for each producer send request(which contains batches of messages per 
broker)? OR Is it for a batch of messages defined according to user batching 
policy? What happens when some application code has multiple async producers to 
increase performance (how are rate and percentiles measured?)?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4384) ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted message

2016-11-07 Thread Jun He (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15643648#comment-15643648
 ] 

Jun He commented on KAFKA-4384:
---

[~becket_qin] Thanks for reviewing this issue. Yes I will provide the patch 
soon.


> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message
> 
>
> Key: KAFKA-4384
> URL: https://issues.apache.org/jira/browse/KAFKA-4384
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1, 0.10.0.0, 0.10.0.1
> Environment: Ubuntu 12.04, AWS D2 instance
>Reporter: Jun He
>
> We recently discovered an issue in Kafka 0.9.0.1 (), where 
> ReplicaFetcherThread stopped after ReplicaFetcherThread received a corrupted 
> message. As the same logic exists also in Kafka 0.10.0.0 and 0.10.0.1, they 
> may have the similar issue.
> Here are system logs related to this issue.
> 2016-10-30 - 02:33:05,606 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.apply - Found invalid messages during fetch for 
> partition [logs,41] offset 39021512238 error Message is corrupt (stored crc = 
> 2028421553, computed crc = 3577227678)
> 2016-10-30 - 02:33:06,582 ERROR ReplicaFetcherThread-5-1590174474 
> ReplicaFetcherThread.error - [ReplicaFetcherThread-5-1590174474], Error due 
> to kafka.common.KafkaException: - error processing data for partition 
> [logs,41] offset 39021512301 Caused - by: java.lang.RuntimeException: Offset 
> mismatch: fetched offset = 39021512301, log end offset = 39021512238.
> First, ReplicaFetcherThread got a corrupted message (offset 39021512238) due 
> to some blip.
> Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L138
>  threw exception
> Then, Line 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L145
>  caught it and logged this error.
> Because 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L134
>  updated the topic partition offset to the fetched latest one in 
> partitionMap. So ReplicaFetcherThread skipped the batch with corrupted 
> messages. 
> Based on 
> https://github.com/apache/kafka/blob/0.9.0.1/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L84,
>  the ReplicaFetcherThread then directly fetched the next batch of messages 
> (with offset 39021512301)
> Next, ReplicaFetcherThread stopped because the log end offset (still 
> 39021512238) didn't match the fetched message (offset 39021512301).
> A quick fix is to move line 134 to be after line 138.
> Would be great to have your comments and please let me know if a Jira issue 
> is needed. Thanks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)