Re: [DISCUSS] KIP-116 - Add State Store Checkpoint Interval Configuration

2017-02-13 Thread Guozhang Wang
I think I'm OK to always enable checkpointing, but I'm not sure if we want
to checkpoint on every commit. Since in the extreme case users can commit
on completed processing each record. So I think it is still valuable to
have a checkpoint internal config in this KIP, which can be ignored if EOS
is turned on. That being said, if most people are favoring checkpointing on
each commit we can try that with this as well, since it won't change any
public APIs and we can still add this config in the future if we do observe
some users reporting it has huge perf impacts.



Guozhang

On Fri, Feb 10, 2017 at 12:20 PM, Damian Guy  wrote:

> I'm fine with that. Gouzhang?
> On Fri, 10 Feb 2017 at 19:45, Matthias J. Sax 
> wrote:
>
> > I am actually supporting Eno's view: checkpoint on every commit.
> >
> > @Dhwani: I understand your view and did raise the same question about
> > performance trade-off with checkpoiting enabled/disabled etc. However,
> > it seems that writing the checkpoint file is super cheap -- thus, there
> > is nothing to gain performance wise by disabling it.
> >
> > For Streams EoS we do not need the checkpoint file -- but we should have
> > a switch for EoS anyway and can disable the checkpoint file for this
> > case. And even if there is no switch and we enable EoS all the time, we
> > can get rid of the checkpoint file overall (making the parameter
> obsolete).
> >
> > IMHO, if the config parameter is not really useful, we should not have
> it.
> >
> >
> > -Matthias
> >
> >
> > On 2/10/17 9:27 AM, Damian Guy wrote:
> > > Gouzhang, Thanks for the clarification. Understood.
> > >
> > > Eno, you are correct if we just used commit interval then we wouldn't
> > need
> > > a KIP. But, then we'd have no way of turning it off.
> > >
> > > On Fri, 10 Feb 2017 at 17:14 Eno Thereska 
> > wrote:
> > >
> > >> A quick check: the checkpoint file is not new, we're just exposing a
> > knob
> > >> on when to set it, right? Would turning if off still do what it does
> > today
> > >> (i.e., write the checkpoint at the end when the user quits?) So it's
> > not a
> > >> new feature as such, I was only recommending we dial up the frequency
> by
> > >> default. With that option arguably we don't even need a KIP.
> > >>
> > >> Eno
> > >>
> > >>
> > >>
> > >>> On 10 Feb 2017, at 17:02, Guozhang Wang  wrote:
> > >>>
> > >>> Damian,
> > >>>
> > >>> I was thinking if it is a new failure scenarios but as Eno pointed
> out
> > it
> > >>> was not.
> > >>>
> > >>> Another thing I was considering is if it has any impact for
> > incorporating
> > >>> KIP-98 to avoid duplicates: if there is a failure in the middle of a
> > >>> transaction, then upon recovery we cannot rely on the local state
> store
> > >>> file even if the checkpoint file exists, since the local state store
> > file
> > >>> may not be at the transaction boundaries. But since Streams will
> likely
> > >> to
> > >>> have EOS as an opt-in I think it is still worthwhile to add this
> > feature,
> > >>> just keeping in mind that when EOS is turned on it may cease to be
> > >>> effective.
> > >>>
> > >>> And yes, I'd suggest we leave the config value to be possibly
> > >> non-positive
> > >>> to indicate not turning on this feature for the reason above: if it
> > will
> > >>> not be effective then we want to leave it as an option to be turned
> > off.
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>> On Fri, Feb 10, 2017 at 8:06 AM, Eno Thereska <
> eno.there...@gmail.com>
> > >>> wrote:
> > >>>
> >  The overhead of writing to the checkpoint file should be much, much
> >  smaller than the overall overhead of doing a commit, so I think
> tuning
> > >> the
> >  commit time is sufficient to guide performance tradeoffs.
> > 
> >  Eno
> > 
> > > On 10 Feb 2017, at 13:08, Dhwani Katagade <
> > >> dhwani_katag...@persistent.co
> >  .in> wrote:
> > >
> > > May be for fine tuning the performance.
> > > Say we don't need the checkpointing and would like to gain the lil
> > bit
> >  of performance improvement by turning it off.
> > > The trade off is between giving people control knobs vs
> complicating
> > >> the
> >  complete set of knobs.
> > >
> > > -dk
> > >
> > > On Friday 10 February 2017 04:05 PM, Eno Thereska wrote:
> > >> I can't see why users would care to turn it off.
> > >>
> > >> Eno
> > >>> On 10 Feb 2017, at 10:29, Damian Guy 
> wrote:
> > >>>
> > >>> Hi Eno,
> > >>>
> > >>> Sounds good to me. The only reason i can think of is if we want
> to
> > be
> >  able
> > >>> to turn it off.
> > >>> Gouzhang - thoughts?
> > >>>
> > >>> On Fri, 10 Feb 2017 at 10:28 Eno Thereska <
> eno.there...@gmail.com>
> >  wrote:
> > >>>
> >  Question: if checkpointing is so cheap why not do it every
> commit
> >  interval? That way 

[jira] [Assigned] (KAFKA-4542) Add authentication based on delegation token.

2017-02-13 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy reassigned KAFKA-4542:
--

Assignee: Manikumar Reddy

> Add authentication based on delegation token.
> -
>
> Key: KAFKA-4542
> URL: https://issues.apache.org/jira/browse/KAFKA-4542
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Ashish Singh
>Assignee: Manikumar Reddy
>
> Add authentication based on delegation token.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-13 Thread Manikumar
Thank you all for your votes and feedback.

The vote has passed with 4 binding votes(Gwen, Jun, Grant, Harsha) and 2
non-binding votes(Roger, Dong Lin).

I have updated the relevant wiki pages.

Thanks
Manikumar

On Tue, Feb 14, 2017 at 12:02 AM, Dong Lin  wrote:

> +1 (non-binding)
>
> On Mon, Feb 13, 2017 at 10:21 AM, Harsha Chintalapani 
> wrote:
>
> > +1.
> > -Harsha
> >
> > On Fri, Feb 10, 2017 at 11:12 PM Manikumar 
> > wrote:
> >
> > > Yes, owners and the renewers can always describe their own tokens.
> > Updated
> > > the KIP.
> > >
> > > On Sat, Feb 11, 2017 at 3:12 AM, Jun Rao  wrote:
> > >
> > > > Hi, Mani,
> > > >
> > > > Thanks for the update. Just a minor comment below. Otherwise, +1 from
> > me.
> > > >
> > > >
> > > > >
> > > > > >
> > > > > > 116. Could you document the ACL rules associated with those new
> > > > requests?
> > > > > > For example, do we allow any one to create, delete, describe
> > > delegation
> > > > > > tokens?
> > > > > >
> > > > > >
> > > > > Currently we only allow a owner to create delegation token for that
> > > owner
> > > > > only.
> > > > > Any thing the owner has permission to do, delegation tokens should
> be
> > > > > allowed to do as well. We can also check renew and expire requests
> > are
> > > > > coming
> > > > > from owner or renewers of the token. So we may not need ACLs for
> > > > > create/renew/expire requests.
> > > > >
> > > > > For describe, we can add DESCRIBE operation on TOKEN Resource. In
> > > future,
> > > > > when we extend
> > > > > the support to allow a user to acquire delegation tokens for other
> > > users,
> > > > > then we can enable
> > > > > CREATE/DELETE operations. Updated the KIP.
> > > > >
> > > > >
> > > > This sounds good. I guess the owner and the renewer can always
> describe
> > > > their own tokens?
> > > >
> > > > Jun
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-13 Thread radai
1. making the client Closeable/AutoCloseable would allow try (Client = ...)
{} without the need to finally close.

2. a "stream processing unit" (producer + consumer) currently holds 2 open
sockets to every broker it interacts with, because producer and consumer
dont share the network stack. if we use the admin API to auto cleanup on
commit for intermediate pipelines (which is one of our use cases) this
figure goes up to 3 sockets per unit of processing per broker. beyond
becoming a scalability issue this (i think) might also introduce annoying
bugs due to some (but not all) of these connections being down. this is not
an issue of this KIP though.

On Mon, Feb 13, 2017 at 11:51 AM, Colin McCabe  wrote:

> On Sun, Feb 12, 2017, at 09:21, Jay Kreps wrote:
> > Hey Colin,
> >
> > Thanks for the hard work on this. I know going back and forth on APIs is
> > kind of frustrating but we're at the point where these things live long
> > enough and are used by enough people that it is worth the pain. I'm sure
> > it'll come down in the right place eventually. A couple things I've found
> > helped in the past:
> >
> >1. The burden of evidence needs to fall on the complicator. i.e. if
> >person X thinks the api should be async they need to produce a set of
> >common use cases that require this. Otherwise you are perpetually
> >having to
> >think "we might need x". I think it is good to have a rule of "simple
> >until
> >proven insufficient".
> >2. Make sure we frame things for the intended audience. At this point
> >our apis get used by a very broad set of Java engineers. This is a
> >very
> >different audience from our developer mailing list. These people code
> >for a
> >living not necessarily as a passion, and may not understand details of
> >the
> >internals of our system or even basic things like multi-threaded
> >programming. I don't think this means we want to dumb things down, but
> >rather try really hard to make things truly simple when possible.
> >
> > Okay here were a couple of comments:
> >
> >1. Conceptually what is a TopicContext? I think it means something
> >like
> >TopicAdmin? It is not literally context about Topics right? What is
> >the
> >relationship of Contexts to clients? Is there a threadsafety
> >difference?
> >Would be nice to not have to think about this, this is what I mean by
> >"conceptual weight". We introduce a new concept that is a bit nebulous
> >that
> >I have to figure out to use what could be a simple api. I'm sure
> >you've
> >been through this experience before where you have these various
> >objects
> >and you're trying to figure out what they represent (the connection to
> >the
> >server? the information to create a connection? a request session?).
>
> The intention was to provide some grouping of methods, and also a place
> to put request parameters which were often set to defaults rather than
> being explicitly set.  If it seems complex, we can certainly get rid of
> it.
>
> >2. We've tried to avoid the Impl naming convention. In general the
> >rule
> >has been if there is only going to be one implementation you don't
> >need an
> >interface. If there will be multiple, distinguish it from the others.
> >The
> >other clients follow this pattern: Producer, KafkaProducer,
> >MockProducer;
> >Consumer, KafkaConsumer, MockConsumer.
>
> Good point.  Let's change the interface to KafkaAdminClient, and the
> implementation to AdminClient.
>
> >3. We generally don't use setters or getters as a naming convention. I
> >personally think mutating the setting in place seems kind of like late
> >90s
> >Java style. I think it likely has thread-safety issues. i.e. even if
> >it is
> >volatile you may not get the value you just set if there is another
> >thread... I actually really liked what you described as your original
> >idea
> >of having a single parameter object like CreateTopicRequest that holds
> >all
> >these parameters and defaults. This lets you evolve the api with all
> >the
> >various combinations of arguments without overloading insanity. After
> >doing
> >literally tens of thousands of remote APIs at LinkedIn we eventually
> >converged on a rule, which is ultimately every remote api needs a
> >single
> >argument object you can add to over time and it must be batched. Which
> >brings me to my next point...
>
> Just to clarify, volatiles were never a part of the proposal.  I think
> that context objects or request objects should be used by a single
> thread at a time.
>
> I'm not opposed to request objects, but I think they raise all the same
> questions as context objects.  Basically, the thread-safety issues need
> to be spelled out and understood by the user, and the user needs more
> lines of code to make a request. 

[jira] [Commented] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-13 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864954#comment-15864954
 ] 

huxi commented on KAFKA-4762:
-

Logs show that you are using 0.10.x where max.partition.fetch.bytes is a hard 
limit even when you enable the compression. In your case, seems that you have 
enabled the compression on the producer side. `max.partition.fetch.bytes` also 
applies to the whole compressed message which is often much larger than a 
single one. That's why you run into RecordTooLargeException.

0.10.1 which completes 
[KIP-74|https://cwiki.apache.org/confluence/display/KAFKA/KIP-74:+Add+Fetch+Response+Size+Limit+in+Bytes]
 already 'fixes' your problem by making  `max.partition.fetch.bytes` field in 
the fetch request much less useful, so you can try with an 0.10.1 build.


> Consumer throwing RecordTooLargeException even when messages are not that 
> large
> ---
>
> Key: KAFKA-4762
> URL: https://issues.apache.org/jira/browse/KAFKA-4762
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vipul Singh
>
> We were just recently hit by a weird error. 
> Before going in any further, explaining of our service setup. we have a 
> producer which produces messages not larger than 256 kb of messages( we have 
> an explicit check about this on the producer side) and on the client side we 
> have a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 
> Recently our client started to see this error:
> {quote}
> org.apache.kafka.common.errors.RecordTooLargeException: There are some 
> messages at [Partition=Offset]: {topic_name-0=9925056036} whose size is 
> larger than the fetch size 524288 and hence cannot be ever returned. Increase 
> the fetch size, or decrease the maximum message size the broker will allow.
> {quote}
> We tried consuming messages with another consumer, without any 
> max.partition.fetch.bytes limit, and it consumed fine. The messages were 
> small, and did not seem to be greater than 256 kb
> We took a log dump, and the log size looked fine.
> {quote}
> mpresscodec: NoCompressionCodec crc: 2473548911 keysize: 8
> offset: 9925056032 position: 191380053 isvalid: true payloadsize: 539 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1656420267 keysize: 8
> offset: 9925056033 position: 191380053 isvalid: true payloadsize: 1551 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2398479758 keysize: 8
> offset: 9925056034 position: 191380053 isvalid: true payloadsize: 1307 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2845554215 keysize: 8
> offset: 9925056035 position: 191380053 isvalid: true payloadsize: 1520 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3106984195 keysize: 8
> offset: 9925056036 position: 191713371 isvalid: true payloadsize: 1207 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3462154435 keysize: 8
> offset: 9925056037 position: 191713371 isvalid: true payloadsize: 418 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1536701802 keysize: 8
> offset: 9925056038 position: 191713371 isvalid: true payloadsize: 299 magic: 
> 0 compresscodec: NoCompressionCodec crc: 4112567543 keysize: 8
> offset: 9925056039 position: 191713371 isvalid: true payloadsize: 1571 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3696994307 keysize: 8
> {quote}
> Has anyone seen something similar? or any points to troubleshoot this further
> Please Note: To overcome this issue, we deployed a new consumer, without this 
> limit of max.partition.fetch.bytes, and it worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-13 Thread radai
if i understand correctly, kafka-acls.sh spins up an instance of (the
custom, in our case) Authorizer, and calls things like addAcls(acls:
Set[Acl], resource: Resource) on it, which are defined in the interface,
hence expected to be "extensible".

(side note: if Authorizer and PrincipalBuilder are defined as extensible
interfaces, why doesnt class Acl, which is in the signature for Authorizer
calls, use java.security.Principal?)

we would like to be able to use the standard kafka-acl command line for
defining ACLs even when replacing the vanilla Authorizer and
PrincipalBuilder (even though we have a management UI for these operations
within linkedin) - simply because thats the correct thing to do from an
extensibility point of view.

On Mon, Feb 13, 2017 at 1:39 PM, Jun Rao  wrote:

> Hi, Mayuresh,
>
> I seems to me that there are two common use cases of authorizer. (1) Use
> the default SimpleAuthorizer and the kafka-acl to do authorization. (2) Use
> a customized authorizer and an external tool for authorization. Do you
> think there is a use case for a customized authorizer and kafka-acl at the
> same time? If not, it's better not to complicate the kafka-acl api.
>
> Thanks,
>
> Jun
>
>
>
> On Mon, Feb 13, 2017 at 10:35 AM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
> > Hi Jun,
> >
> > Thanks for the review and comments. Please find the replies inline :
> >
> > This is so that in the future, we can extend to types like group.
> > ---> Yep, I did think the same. But since the SocketServer was always
> > creating User type, it wasn't actually used. If we go ahead with changes
> in
> > this KIP, we will give this power of creating different Principal types
> to
> > the PrincipalBuilder (which users can define there own). In that way
> Kafka
> > will not have to deal with handling this. So the Principal building and
> > Authorization will be opaque to Kafka which seems like an expected
> > behavior.
> >
> >
> > Hmm, normally, the configurations you specify for plug-ins refer to those
> > needed to construct the plug-in object. So, it's kind of weird to use
> that
> > to call a method. For example, why can't principalBuilderService.rest.
> url
> > be passed in through the configure() method and the implementation can
> use
> > that to build principal. This way, there is only a single method to
> compute
> > the principal in a consistent way in the broker and in the kafka-acl
> tool.
> > > We can do that as well. But since the rest url is not related to
> the
> > Principal, it seems out of place to me to pass it every time we have to
> > create a Principal. I should replace "principalConfigs" with
> > "principalProperties".
> > I was trying to differentiate the configs/properties that are used to
> > create the PrincipalBuilder class and the Principal/Principals itself.
> >
> >
> > For LinkedIn's use case, do you actually use the kafka-acl tool? My
> > understanding is that LinkedIn does authorization through an external
> tool.
> > > For Linkedin's use case we don't actually use the kafka-acl tool
> > right now. As per the discussion that we had on
> > https://issues.apache.org/jira/browse/KAFKA-4454, we thought that it
> would
> > be good to make kafka-acl tool changes, to make it flexible and we might
> be
> > even able to use it in future.
> >
> > It seems it's simpler if kafka-acl doesn't to need to understand the
> > principal builder. The tool does authorization based on a string name,
> > which is expected to match the principal name. So, I am wondering why the
> > tool needs to know the principal builder.
> > > If we don't make this change, I am not sure how clients/end users
> > will be able to use this tool if they have there own Authorizer that does
> > Authorization based on Principal, that has more information apart from
> name
> > and type.
> >
> > What if we only make the following changes: pass the java principal in
> > session and in
> > SimpleAuthorizer, construct KafkaPrincipal from java principal name. Will
> > that work for LinkedIn?
> > > This can work for Linkedin but as explained above, it does not seem
> > like a complete design from open source point of view.
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> > On Thu, Feb 9, 2017 at 11:29 AM, Jun Rao  wrote:
> >
> > > Hi, Mayuresh,
> > >
> > > Thanks for the reply. A few more comments below.
> > >
> > > On Wed, Feb 8, 2017 at 9:14 PM, Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > Thanks for the review. Please find the responses inline.
> > > >
> > > > 1. It seems the problem that you are trying to address is that java
> > > > principal returned from KafkaChannel may have additional fields than
> > name
> > > > that are needed during authorization. Have you considered a
> customized
> > > > PrincipleBuilder that extracts all needed fields from java principal
> > and
> > > > squeezes them as a json in the name of the 

Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-13 Thread Dong Lin
And the test plan has also been updated to simulate disk failure by
changing log directory permission to 000.

On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin  wrote:

> Hi Jun,
>
> Thanks for the reply. These comments are very helpful. Let me answer them
> inline.
>
>
> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao  wrote:
>
>> Hi, Dong,
>>
>> Thanks for the reply. A few more replies and new comments below.
>>
>> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin  wrote:
>>
>> > Hi Jun,
>> >
>> > Thanks for the detailed comments. Please see answers inline:
>> >
>> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao  wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > Thanks for the updated wiki. A few comments below.
>> > >
>> > > 1. Topics get created
>> > > 1.1 Instead of storing successfully created replicas in ZK, could we
>> > store
>> > > unsuccessfully created replicas in ZK? Since the latter is less
>> common,
>> > it
>> > > probably reduces the load on ZK.
>> > >
>> >
>> > We can store unsuccessfully created replicas in ZK. But I am not sure if
>> > that can reduce write load on ZK.
>> >
>> > If we want to reduce write load on ZK using by store unsuccessfully
>> created
>> > replicas in ZK, then broker should not write to ZK if all replicas are
>> > successfully created. It means that if /broker/topics/[topic]/partiti
>> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a
>> given
>> > partition, we have to assume all replicas of this partition have been
>> > successfully created and send LeaderAndIsrRequest with create = false.
>> This
>> > becomes a problem if controller crashes before receiving
>> > LeaderAndIsrResponse to validate whether a replica has been created.
>> >
>> > I think this approach and reduce the number of bytes stored in ZK. But
>> I am
>> > not sure if this is a concern.
>> >
>> >
>> >
>> I was mostly concerned about the controller failover time. Currently, the
>> controller failover is likely dominated by the cost of reading
>> topic/partition level information from ZK. If we add another partition
>> level path in ZK, it probably will double the controller failover time. If
>> the approach of representing the non-created replicas doesn't work, have
>> you considered just adding the created flag in the leaderAndIsr path in
>> ZK?
>>
>>
> Yes, I have considered adding the created flag in the leaderAndIsr path in
> ZK. If we were to add created flag per replica in the LeaderAndIsrRequest,
> then it requires a lot of change in the code base.
>
> If we don't add created flag per replica in the LeaderAndIsrRequest, then
> the information in leaderAndIsr path in ZK and LeaderAndIsrRequest would be
> different. Further, the procedure for broker to update ISR in ZK will be a
> bit complicated. When leader updates leaderAndIsr path in ZK, it will have
> to first read created flags from ZK, change isr, and write leaderAndIsr
> back to ZK. And it needs to check znode version and re-try write operation
> in ZK if controller has updated ZK during this period. This is in contrast
> to the current implementation where the leader either gets all the
> information from LeaderAndIsrRequest sent by controller, or determine the
> infromation by itself (e.g. ISR), before writing to leaderAndIsr path in ZK.
>
> It seems to me that the above solution is a bit complicated and not clean.
> Thus I come up with the design in this KIP to store this created flag in a
> separate zk path. The path is named controller_managed_state to indicate
> that we can store in this znode all information that is managed by
> controller only, as opposed to ISR.
>
> I agree with your concern of increased ZK read time during controller
> failover. How about we store the "created" information in the
> znode /brokers/topics/[topic]? We can change that znode to have the
> following data format:
>
> {
>   "version" : 2,
>   "created" : {
> "1" : [1, 2, 3],
> ...
>   }
>   "partition" : {
> "1" : [1, 2, 3],
> ...
>   }
> }
>
> We won't have extra zk read using this solution. It also seems reasonable
> to put the partition assignment information together with replica creation
> information. The latter is only changed once after the partition is created
> or re-assigned.
>
>
>>
>>
>> >
>> > > 1.2 If an error is received for a follower, does the controller
>> eagerly
>> > > remove it from ISR or do we just let the leader removes it after
>> timeout?
>> > >
>> >
>> > No, Controller will not actively remove it from ISR. But controller will
>> > recognize it as offline replica and propagate this information to all
>> > brokers via UpdateMetadataRequest. Each leader can use this information
>> to
>> > actively remove offline replica from ISR set. I have updated to wiki to
>> > clarify it.
>> >
>> >
>>
>> That seems inconsistent with how the controller deals with offline
>> replicas
>> due to broker failures. When that happens, the broker will (1) select a

Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-13 Thread Dong Lin
Hi Jun,

Thanks for the reply. These comments are very helpful. Let me answer them
inline.


On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao  wrote:

> Hi, Dong,
>
> Thanks for the reply. A few more replies and new comments below.
>
> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin  wrote:
>
> > Hi Jun,
> >
> > Thanks for the detailed comments. Please see answers inline:
> >
> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao  wrote:
> >
> > > Hi, Dong,
> > >
> > > Thanks for the updated wiki. A few comments below.
> > >
> > > 1. Topics get created
> > > 1.1 Instead of storing successfully created replicas in ZK, could we
> > store
> > > unsuccessfully created replicas in ZK? Since the latter is less common,
> > it
> > > probably reduces the load on ZK.
> > >
> >
> > We can store unsuccessfully created replicas in ZK. But I am not sure if
> > that can reduce write load on ZK.
> >
> > If we want to reduce write load on ZK using by store unsuccessfully
> created
> > replicas in ZK, then broker should not write to ZK if all replicas are
> > successfully created. It means that if /broker/topics/[topic]/partiti
> > ons/[partitionId]/controller_managed_state doesn't exist in ZK for a
> given
> > partition, we have to assume all replicas of this partition have been
> > successfully created and send LeaderAndIsrRequest with create = false.
> This
> > becomes a problem if controller crashes before receiving
> > LeaderAndIsrResponse to validate whether a replica has been created.
> >
> > I think this approach and reduce the number of bytes stored in ZK. But I
> am
> > not sure if this is a concern.
> >
> >
> >
> I was mostly concerned about the controller failover time. Currently, the
> controller failover is likely dominated by the cost of reading
> topic/partition level information from ZK. If we add another partition
> level path in ZK, it probably will double the controller failover time. If
> the approach of representing the non-created replicas doesn't work, have
> you considered just adding the created flag in the leaderAndIsr path in ZK?
>
>
Yes, I have considered adding the created flag in the leaderAndIsr path in
ZK. If we were to add created flag per replica in the LeaderAndIsrRequest,
then it requires a lot of change in the code base.

If we don't add created flag per replica in the LeaderAndIsrRequest, then
the information in leaderAndIsr path in ZK and LeaderAndIsrRequest would be
different. Further, the procedure for broker to update ISR in ZK will be a
bit complicated. When leader updates leaderAndIsr path in ZK, it will have
to first read created flags from ZK, change isr, and write leaderAndIsr
back to ZK. And it needs to check znode version and re-try write operation
in ZK if controller has updated ZK during this period. This is in contrast
to the current implementation where the leader either gets all the
information from LeaderAndIsrRequest sent by controller, or determine the
infromation by itself (e.g. ISR), before writing to leaderAndIsr path in ZK.

It seems to me that the above solution is a bit complicated and not clean.
Thus I come up with the design in this KIP to store this created flag in a
separate zk path. The path is named controller_managed_state to indicate
that we can store in this znode all information that is managed by
controller only, as opposed to ISR.

I agree with your concern of increased ZK read time during controller
failover. How about we store the "created" information in the
znode /brokers/topics/[topic]? We can change that znode to have the
following data format:

{
  "version" : 2,
  "created" : {
"1" : [1, 2, 3],
...
  }
  "partition" : {
"1" : [1, 2, 3],
...
  }
}

We won't have extra zk read using this solution. It also seems reasonable
to put the partition assignment information together with replica creation
information. The latter is only changed once after the partition is created
or re-assigned.


>
>
> >
> > > 1.2 If an error is received for a follower, does the controller eagerly
> > > remove it from ISR or do we just let the leader removes it after
> timeout?
> > >
> >
> > No, Controller will not actively remove it from ISR. But controller will
> > recognize it as offline replica and propagate this information to all
> > brokers via UpdateMetadataRequest. Each leader can use this information
> to
> > actively remove offline replica from ISR set. I have updated to wiki to
> > clarify it.
> >
> >
>
> That seems inconsistent with how the controller deals with offline replicas
> due to broker failures. When that happens, the broker will (1) select a new
> leader if the offline replica is the leader; (2) remove the replica from
> ISR if the offline replica is the follower. So, intuitively, it seems that
> we should be doing the same thing when dealing with offline replicas due to
> disk failure.
>

My bad. I misunderstand how the controller currently handles broker failure
and ISR change. Yes we should 

Re: [VOTE] 0.10.2.0 RC1

2017-02-13 Thread Guozhang Wang
Hi Ian,

Thanks for the responses. Another suggestion I had is about your
session.timeout.ms config value (currently set to 10 seconds): since Nina
meaned that a processing of a single record could be as long as minutes,
that session timeout would be insufficient as no heartbeats will be sent by
the streams' consumer and the coordinator may mark it as dead. This could
probably be the reason of unexpected rebalances as well, due to the false
positives of failure detection.


Guozhang


On Mon, Feb 13, 2017 at 4:22 PM, Matthias J. Sax 
wrote:

> Ian,
>
> an important hint: it is highly recommended to change "state.dir"
> configuration parameter from "/tmp/kafka-streams" to a different
> directory. It might be, that /tmp gets deleted and thus you loose all
> your cached data.
>
> While this is no an issues with regard to data loss (as all data is
> reliably store in Kafka brokers) it might give you a performance penalty
> as the state need to get re-created.
>
> Furthermore, if the application is running and /tmp gets deleted, your
> app instance might crash, as RocksDB relies on the data it writes to disc.
>
> Could this be related to your problem?
>
>
> -Matthias
>
> On 2/13/17 2:10 PM, Ian Duffy wrote:
> > Hi Guozhang,
> >
> > Thank you for your assistance on this.
> >
> >> About the stack trace pasted before: is it tailing some warning logs
> like
> > "Could not create task ... Will retry"
> >
> > Yes, we see the following:
> >
> > 17/02/13 21:49:55 WARN internals.StreamThread: Could not create task
> 0_93.
> > Will retry.
> > org.apache.kafka.streams.errors.LockException: task [0_93] Failed to
> lock
> > the state directory: /tmp/kafka-streams/text_pipeline_id/0_93
> > #011at
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.(ProcessorStateManager.java:102)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.AbstractTask.(
> AbstractTask.java:73)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.(StreamTask.java:108)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:834)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.
> createTask(StreamThread.java:1207)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StreamThread$
> AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:937)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StreamThread.access$500(
> StreamThread.java:69)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:236)
> >
> > #011at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:255)
> >
> > #011at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:339)
> >
> > #011at
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:303)
> >
> > #011at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:286)
> >
> > #011at
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1030)
> >
> > #011at
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:582)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> >
> >
> > It is always partition 93, additionally, we've seen processing of all the
> > other partitions without issue in between the crashes but the offset for
> 93
> > hasn't moved.
> >
> >> it is thrown as a exception all they way to the thread's
> > setUncaughtExceptionHandler, in which case your code should have a
> > `Unexpected Exception caught in thread` error log entry and the thread
> will
> > die.
> >
> > We have seen this too:
> >
> > org.apache.kafka.streams.errors.ProcessorStateException: task directory
> > [/tmp/kafka-streams/dougal_text_pipeline_id/0_94] doesn't exist and
> > couldn't be created
> > #011at
> > org.apache.kafka.streams.processor.internals.StateDirectory.
> directoryForTask(StateDirectory.java:75)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StateDirectory.lock(
> StateDirectory.java:102)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StateDirectory.
> cleanRemovedTasks(StateDirectory.java:205)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(
> StreamThread.java:753)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:664)
> >
> > #011at
> > org.apache.kafka.streams.processor.internals.
> 

Re: [VOTE] 0.10.2.0 RC1

2017-02-13 Thread Matthias J. Sax
Ian,

an important hint: it is highly recommended to change "state.dir"
configuration parameter from "/tmp/kafka-streams" to a different
directory. It might be, that /tmp gets deleted and thus you loose all
your cached data.

While this is no an issues with regard to data loss (as all data is
reliably store in Kafka brokers) it might give you a performance penalty
as the state need to get re-created.

Furthermore, if the application is running and /tmp gets deleted, your
app instance might crash, as RocksDB relies on the data it writes to disc.

Could this be related to your problem?


-Matthias

On 2/13/17 2:10 PM, Ian Duffy wrote:
> Hi Guozhang,
> 
> Thank you for your assistance on this.
> 
>> About the stack trace pasted before: is it tailing some warning logs like
> "Could not create task ... Will retry"
> 
> Yes, we see the following:
> 
> 17/02/13 21:49:55 WARN internals.StreamThread: Could not create task 0_93.
> Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_93] Failed to lock
> the state directory: /tmp/kafka-streams/text_pipeline_id/0_93
> #011at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)
> 
> #011at
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> 
> #011at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> 
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> 
> #011at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 
> #011at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> 
> #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> 
> #011at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> 
> 
> It is always partition 93, additionally, we've seen processing of all the
> other partitions without issue in between the crashes but the offset for 93
> hasn't moved.
> 
>> it is thrown as a exception all they way to the thread's
> setUncaughtExceptionHandler, in which case your code should have a
> `Unexpected Exception caught in thread` error log entry and the thread will
> die.
> 
> We have seen this too:
> 
> org.apache.kafka.streams.errors.ProcessorStateException: task directory
> [/tmp/kafka-streams/dougal_text_pipeline_id/0_94] doesn't exist and
> couldn't be created
> #011at
> org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:75)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:102)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:205)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:753)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:664)
> 
> #011at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> 
> 17/02/13 21:11:01 INFO internals.StreamThread: stream-thread
> [StreamThread-1] Committing all tasks because the commit interval 3ms
> has elapsed
> 17/02/13 21:11:01 INFO internals.StreamThread: stream-thread
> [StreamThread-1] Committing task StreamTask 0_70
> 17/02/13 21:11:01 INFO internals.StateDirectory: Deleting obsolete state
> directory 0_61 for task 0_61
> 
> Our initial thoughts with "doesn't exist and couldn't be created" made us
> think it might be a disk space or permissions issue but we confirmed this
> not to be the case.
> 
>> Could you monitor the number of live stream threads during the runtime
> (originally there should be 8 according to your configs) and see if the
> count has decreased, meaning some threads have been dead?
> 
> I'll try get some data on this for you 

Re: Need suggestion on sending XML files via kafka

2017-02-13 Thread Ratha v
This Sample program may help you?
http://vvratha.blogspot.com.au/2016/07/sample-kafka-producer-and-consumer.html

On 14 February 2017 at 06:36, Prashanth Venkatesan <
prashanth.181...@gmail.com> wrote:

> Hi Team,
>
> I just started using Kafka. I have a usecase to send XML file or Document
> object via Kafka topic using Java. Can you enlight me with the guidance
> steps to achieve it??
>
> Please apologize and ignore if I am posting to inappropriate mail address.
>
> Thanks
> Prashanth
> +91-9677103475
> India
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: [DISCUSS] KIP-112: Handle disk failure for JBOD

2017-02-13 Thread Jun Rao
Hi, Dong,

Thanks for the reply. A few more replies and new comments below.

On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin  wrote:

> Hi Jun,
>
> Thanks for the detailed comments. Please see answers inline:
>
> On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > Thanks for the updated wiki. A few comments below.
> >
> > 1. Topics get created
> > 1.1 Instead of storing successfully created replicas in ZK, could we
> store
> > unsuccessfully created replicas in ZK? Since the latter is less common,
> it
> > probably reduces the load on ZK.
> >
>
> We can store unsuccessfully created replicas in ZK. But I am not sure if
> that can reduce write load on ZK.
>
> If we want to reduce write load on ZK using by store unsuccessfully created
> replicas in ZK, then broker should not write to ZK if all replicas are
> successfully created. It means that if /broker/topics/[topic]/partiti
> ons/[partitionId]/controller_managed_state doesn't exist in ZK for a given
> partition, we have to assume all replicas of this partition have been
> successfully created and send LeaderAndIsrRequest with create = false. This
> becomes a problem if controller crashes before receiving
> LeaderAndIsrResponse to validate whether a replica has been created.
>
> I think this approach and reduce the number of bytes stored in ZK. But I am
> not sure if this is a concern.
>
>
>
I was mostly concerned about the controller failover time. Currently, the
controller failover is likely dominated by the cost of reading
topic/partition level information from ZK. If we add another partition
level path in ZK, it probably will double the controller failover time. If
the approach of representing the non-created replicas doesn't work, have
you considered just adding the created flag in the leaderAndIsr path in ZK?



>
> > 1.2 If an error is received for a follower, does the controller eagerly
> > remove it from ISR or do we just let the leader removes it after timeout?
> >
>
> No, Controller will not actively remove it from ISR. But controller will
> recognize it as offline replica and propagate this information to all
> brokers via UpdateMetadataRequest. Each leader can use this information to
> actively remove offline replica from ISR set. I have updated to wiki to
> clarify it.
>
>

That seems inconsistent with how the controller deals with offline replicas
due to broker failures. When that happens, the broker will (1) select a new
leader if the offline replica is the leader; (2) remove the replica from
ISR if the offline replica is the follower. So, intuitively, it seems that
we should be doing the same thing when dealing with offline replicas due to
disk failure.



>
> > 1.3 Similar, if an error is received for a leader, should the controller
> > trigger leader election again?
> >
>
> Yes, controller will trigger leader election if leader replica is offline.
> I have updated the wiki to clarify it.
>
>
> >
> > 2. A log directory stops working on a broker during runtime:
> > 2.1 It seems the broker remembers the failed directory after hitting an
> > IOException and the failed directory won't be used for creating new
> > partitions until the broker is restarted? If so, could you add that to
> the
> > wiki.
> >
>
> Right, broker assumes a log directory to be good after it starts, and mark
> log directory as bad once there is IOException when broker attempts to
> access the log directory. New replicas will only be created on good log
> directory. I just added this to the KIP.
>
>
> > 2.2 Could you be a bit more specific on how and during which operation
> the
> > broker detects directory failure? Is it when the broker hits an
> IOException
> > during writes, or both reads and writes?  For example, during broker
> > startup, it only reads from each of the log directories, if it hits an
> > IOException there, does the broker immediately mark the directory as
> > offline?
> >
>
> Broker marks log directory as bad once there is IOException when broker
> attempts to access the log directory. This includes read and write. These
> operations include log append, log read, log cleaning, watermark checkpoint
> etc. If broker hits IOException when it reads from each of the log
> directory during startup, it immediately mark the directory as offline.
>
> I just updated the KIP to clarify it.
>
>
> > 3. Partition reassignment: If we know a replica is offline, do we still
> > want to send StopReplicaRequest to it?
> >
>
> No, controller doesn't send StopReplicaRequest for an offline replica.
> Controller treats this scenario in the same way that exiting Kafka
> implementation does when the broker of this replica is offline.
>
>
> >
> > 4. UpdateMetadataRequestPartitionState: For offline_replicas, do they
> only
> > include offline replicas due to log directory failures or do they also
> > include offline replicas due to broker failure?
> >
>
> UpdateMetadataRequestPartitionState's offline_replicas include 

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-13 Thread Apurva Mehta
Hi Jun,

Thanks for the comments.

200.1, 200.2, 200.3 were all oversights which have been fixed.

201.1:
This has been added to the BeginTxnRequest now. If the
transaction.timeout.ms value is greater than max.transaction.timeout.ms,
then the BeginTxnRequest will fail with a `InvalidTransactionTimeout`
error. The document has been updated to reflect this.

201.2:
I added configurations for number of partitions, number of replicas, and
segment size. I don't think the compression configuration makes sense
because each Message set in the transaction log will have exactly one
message, and compression won't really buy us much. Do let me know if you
disagree.

201.3:
The default has been updated to `read_uncommitted`, which is the new name
for `all`.



On Mon, Feb 13, 2017 at 1:21 PM, Jun Rao  wrote:

> Hi, Guozhang,
>
> Thanks for the proposal. I made a pass of the wiki and had the following
> comments.
>
> 200. Message format:
> 200.1 MaxTimestampDelta: Does that need to be delta since it's always a
> fixed size in64?
> 200.2 The wiki says "At the end we still maintains a message-level CRC". Is
> that still valid?
> 200.3 In the ProducerRequest, do we need messageSet size?
> 200.4 One of the things that we may want to add in the future is KIP-82
> (per record header). It would be useful to think a bit how easy it is to
> support that with the new message format.
>
>
> 201. Configurations:
> 201.1 transaction.timeout.ms in the producer: It seems that it's missing
> in
> BeginTxnRequest? Also, what happens when the value is larger than
> max.transaction.timeout.ms on the broker?
> 201.2 For the internal transactional topic, do we need additional broker
> side configurations to control # of partitions, # of replicas, compression
> codec, segment size like the offset topic?
> 201.3 isolation.level: It says the default is "all", but there is no option
> for "all".
>
> Thanks,
>
> Jun
>
> On Wed, Feb 1, 2017 at 8:13 PM, Guozhang Wang  wrote:
>
> > Hi all,
> >
> > We would like to start the voting process for KIP-98. The KIP can be
> found
> > at
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> >
> > Discussion thread can be found here:
> >
> > http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> > DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
> >
> > Thanks,
> >
> > --
> > -- Guozhang
> >
>


[jira] [Created] (KAFKA-4763) Handle disk failure for JBOD (KIP-112)

2017-02-13 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-4763:
---

 Summary: Handle disk failure for JBOD (KIP-112)
 Key: KAFKA-4763
 URL: https://issues.apache.org/jira/browse/KAFKA-4763
 Project: Kafka
  Issue Type: Improvement
Reporter: Dong Lin
Assignee: Dong Lin


See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD
 for motivation and design.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4761) NullPointerException if batch.size=0 for producer config

2017-02-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4761:
---
Resolution: Fixed
Status: Resolved  (was: Patch Available)

Issue resolved by pull request 2545
[https://github.com/apache/kafka/pull/2545]

> NullPointerException if batch.size=0 for producer config
> 
>
> Key: KAFKA-4761
> URL: https://issues.apache.org/jira/browse/KAFKA-4761
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
> Environment: jdk1.8.0_40
>Reporter: Swen Moczarski
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 0.10.2.0
>
> Attachments: KafkaProducerTest.java
>
>
> When setting {{batch.size}} to {{0}} for producer I get the following 
> exception when sending a record:
> {noformat}
> java.lang.NullPointerException
>   at org.apache.kafka.common.utils.Utils.notNull(Utils.java:315)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:197)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
>   at KafkaProducerTest.exposeNpeOnBatchSizeZero(KafkaProducerTest.java:21)
> {noformat}
> But documentation says:
> {quote}
>  ... (a batch size of zero will disable batching entirely).
> {quote}
> Please, see attached test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] 0.10.2.0 RC1

2017-02-13 Thread Ian Duffy
Hi Guozhang,

Thank you for your assistance on this.

> About the stack trace pasted before: is it tailing some warning logs like
"Could not create task ... Will retry"

Yes, we see the following:

17/02/13 21:49:55 WARN internals.StreamThread: Could not create task 0_93.
Will retry.
org.apache.kafka.streams.errors.LockException: task [0_93] Failed to lock
the state directory: /tmp/kafka-streams/text_pipeline_id/0_93
#011at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102)

#011at
org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)

#011at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)

#011at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)

#011at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)

#011at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)

#011at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)

#011at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)

#011at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)

#011at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)

#011at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)

#011at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)


It is always partition 93, additionally, we've seen processing of all the
other partitions without issue in between the crashes but the offset for 93
hasn't moved.

> it is thrown as a exception all they way to the thread's
setUncaughtExceptionHandler, in which case your code should have a
`Unexpected Exception caught in thread` error log entry and the thread will
die.

We have seen this too:

org.apache.kafka.streams.errors.ProcessorStateException: task directory
[/tmp/kafka-streams/dougal_text_pipeline_id/0_94] doesn't exist and
couldn't be created
#011at
org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:75)

#011at
org.apache.kafka.streams.processor.internals.StateDirectory.lock(StateDirectory.java:102)

#011at
org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:205)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.maybeClean(StreamThread.java:753)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:664)

#011at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

17/02/13 21:11:01 INFO internals.StreamThread: stream-thread
[StreamThread-1] Committing all tasks because the commit interval 3ms
has elapsed
17/02/13 21:11:01 INFO internals.StreamThread: stream-thread
[StreamThread-1] Committing task StreamTask 0_70
17/02/13 21:11:01 INFO internals.StateDirectory: Deleting obsolete state
directory 0_61 for task 0_61

Our initial thoughts with "doesn't exist and couldn't be created" made us
think it might be a disk space or permissions issue but we confirmed this
not to be the case.

> Could you monitor the number of live stream threads during the runtime
(originally there should be 8 according to your configs) and see if the
count has decreased, meaning some threads have been dead?

I'll try get some data on this for you tomorrow.

> When you reported that the process "just hang on attempting to rejoin",
how long have you observed it hanging before killed the process?

With the 10.1.1 client we waited up to an hour, it fails to recover at all.

With the 10.2 client today it consistently self-recovered but was quickly
switching back into a rebalancing state.
We've been unable to identify a cause for rebalancing so frequently. We
thought it be because processing time can potentially be pretty long
(greater than 5min less than10min)

> Could you try with fewer number of partitions, from 96 to 8 just for the
purpose of trouble shooting to see if this issue still exists?

Will try this tomorrow, I would expect we would see a lot less rebalancing
with fewer partitions.

Thanks again for all your help.
Ian.

On 13 February 2017 at 19:55, Guozhang Wang  

[GitHub] kafka pull request #2545: KAFKA-4761: Fix producer regression handling small...

2017-02-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2545


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4761) NullPointerException if batch.size=0 for producer config

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864550#comment-15864550
 ] 

ASF GitHub Bot commented on KAFKA-4761:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2545


> NullPointerException if batch.size=0 for producer config
> 
>
> Key: KAFKA-4761
> URL: https://issues.apache.org/jira/browse/KAFKA-4761
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
> Environment: jdk1.8.0_40
>Reporter: Swen Moczarski
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 0.10.2.0
>
> Attachments: KafkaProducerTest.java
>
>
> When setting {{batch.size}} to {{0}} for producer I get the following 
> exception when sending a record:
> {noformat}
> java.lang.NullPointerException
>   at org.apache.kafka.common.utils.Utils.notNull(Utils.java:315)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:197)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
>   at KafkaProducerTest.exposeNpeOnBatchSizeZero(KafkaProducerTest.java:21)
> {noformat}
> But documentation says:
> {quote}
>  ... (a batch size of zero will disable batching entirely).
> {quote}
> Please, see attached test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-13 Thread Rajini Sivaram
+1 (non-binding)

On Mon, Feb 13, 2017 at 9:25 PM, Bill Bejeck  wrote:

> Sorry for the late response, +1
>
> On Mon, Feb 13, 2017 at 4:21 PM, Jun Rao  wrote:
>
> > Hi, Guozhang,
> >
> > Thanks for the proposal. I made a pass of the wiki and had the following
> > comments.
> >
> > 200. Message format:
> > 200.1 MaxTimestampDelta: Does that need to be delta since it's always a
> > fixed size in64?
> > 200.2 The wiki says "At the end we still maintains a message-level CRC".
> Is
> > that still valid?
> > 200.3 In the ProducerRequest, do we need messageSet size?
> > 200.4 One of the things that we may want to add in the future is KIP-82
> > (per record header). It would be useful to think a bit how easy it is to
> > support that with the new message format.
> >
> >
> > 201. Configurations:
> > 201.1 transaction.timeout.ms in the producer: It seems that it's missing
> > in
> > BeginTxnRequest? Also, what happens when the value is larger than
> > max.transaction.timeout.ms on the broker?
> > 201.2 For the internal transactional topic, do we need additional broker
> > side configurations to control # of partitions, # of replicas,
> compression
> > codec, segment size like the offset topic?
> > 201.3 isolation.level: It says the default is "all", but there is no
> option
> > for "all".
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Feb 1, 2017 at 8:13 PM, Guozhang Wang 
> wrote:
> >
> > > Hi all,
> > >
> > > We would like to start the voting process for KIP-98. The KIP can be
> > found
> > > at
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > >
> > > Discussion thread can be found here:
> > >
> > > http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> > > DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
> > >
> > > Thanks,
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


[GitHub] kafka pull request #2540: KAFKA-4756: The auto-generated broker id should be...

2017-02-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2540


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-4756) The auto-generated broker id should be passed to MetricReporter.configure

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864515#comment-15864515
 ] 

ASF GitHub Bot commented on KAFKA-4756:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2540


> The auto-generated broker id should be passed to MetricReporter.configure
> -
>
> Key: KAFKA-4756
> URL: https://issues.apache.org/jira/browse/KAFKA-4756
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Colin P. McCabe
> Fix For: 0.10.2.0
>
>
> We currently pass the original config values and we initialise the metric 
> reporters in the constructor of `KafkaServer` while we get the generated 
> broker id in the `startup` method.
> We should also check if there's any other pluggable interface that suffers 
> from the same issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (KAFKA-4756) The auto-generated broker id should be passed to MetricReporter.configure

2017-02-13 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-4756.

   Resolution: Fixed
Fix Version/s: 0.10.2.0

Issue resolved by pull request 2540
[https://github.com/apache/kafka/pull/2540]

> The auto-generated broker id should be passed to MetricReporter.configure
> -
>
> Key: KAFKA-4756
> URL: https://issues.apache.org/jira/browse/KAFKA-4756
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Colin P. McCabe
> Fix For: 0.10.2.0
>
>
> We currently pass the original config values and we initialise the metric 
> reporters in the constructor of `KafkaServer` while we get the generated 
> broker id in the `startup` method.
> We should also check if there's any other pluggable interface that suffers 
> from the same issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-13 Thread Jun Rao
Hi, Mayuresh,

I seems to me that there are two common use cases of authorizer. (1) Use
the default SimpleAuthorizer and the kafka-acl to do authorization. (2) Use
a customized authorizer and an external tool for authorization. Do you
think there is a use case for a customized authorizer and kafka-acl at the
same time? If not, it's better not to complicate the kafka-acl api.

Thanks,

Jun



On Mon, Feb 13, 2017 at 10:35 AM, Mayuresh Gharat <
gharatmayures...@gmail.com> wrote:

> Hi Jun,
>
> Thanks for the review and comments. Please find the replies inline :
>
> This is so that in the future, we can extend to types like group.
> ---> Yep, I did think the same. But since the SocketServer was always
> creating User type, it wasn't actually used. If we go ahead with changes in
> this KIP, we will give this power of creating different Principal types to
> the PrincipalBuilder (which users can define there own). In that way Kafka
> will not have to deal with handling this. So the Principal building and
> Authorization will be opaque to Kafka which seems like an expected
> behavior.
>
>
> Hmm, normally, the configurations you specify for plug-ins refer to those
> needed to construct the plug-in object. So, it's kind of weird to use that
> to call a method. For example, why can't principalBuilderService.rest.url
> be passed in through the configure() method and the implementation can use
> that to build principal. This way, there is only a single method to compute
> the principal in a consistent way in the broker and in the kafka-acl tool.
> > We can do that as well. But since the rest url is not related to the
> Principal, it seems out of place to me to pass it every time we have to
> create a Principal. I should replace "principalConfigs" with
> "principalProperties".
> I was trying to differentiate the configs/properties that are used to
> create the PrincipalBuilder class and the Principal/Principals itself.
>
>
> For LinkedIn's use case, do you actually use the kafka-acl tool? My
> understanding is that LinkedIn does authorization through an external tool.
> > For Linkedin's use case we don't actually use the kafka-acl tool
> right now. As per the discussion that we had on
> https://issues.apache.org/jira/browse/KAFKA-4454, we thought that it would
> be good to make kafka-acl tool changes, to make it flexible and we might be
> even able to use it in future.
>
> It seems it's simpler if kafka-acl doesn't to need to understand the
> principal builder. The tool does authorization based on a string name,
> which is expected to match the principal name. So, I am wondering why the
> tool needs to know the principal builder.
> > If we don't make this change, I am not sure how clients/end users
> will be able to use this tool if they have there own Authorizer that does
> Authorization based on Principal, that has more information apart from name
> and type.
>
> What if we only make the following changes: pass the java principal in
> session and in
> SimpleAuthorizer, construct KafkaPrincipal from java principal name. Will
> that work for LinkedIn?
> > This can work for Linkedin but as explained above, it does not seem
> like a complete design from open source point of view.
>
> Thanks,
>
> Mayuresh
>
>
> On Thu, Feb 9, 2017 at 11:29 AM, Jun Rao  wrote:
>
> > Hi, Mayuresh,
> >
> > Thanks for the reply. A few more comments below.
> >
> > On Wed, Feb 8, 2017 at 9:14 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Thanks for the review. Please find the responses inline.
> > >
> > > 1. It seems the problem that you are trying to address is that java
> > > principal returned from KafkaChannel may have additional fields than
> name
> > > that are needed during authorization. Have you considered a customized
> > > PrincipleBuilder that extracts all needed fields from java principal
> and
> > > squeezes them as a json in the name of the returned principal? Then,
> the
> > > authorizer can just parse the json and extract needed fields.
> > > ---> Yes we had thought about this. We use a third party library that
> > takes
> > > in the passed in cert and creates the Principal. This Principal is then
> > > used by the library to make the decision (ALLOW/DENY) when we call it
> in
> > > the Authorizer. It does not have an API to create the Principal from a
> > > String. If it did support, still we would have to be aware of the
> > internal
> > > details of the library, like the field values it creates from the
> certs,
> > > defaults and so on.
> > >
> > > 2. Could you explain how the default authorizer works now? Currently,
> the
> > > code just compares the two principal objects. Are we converting the
> java
> > > principal to a KafkaPrincipal there?
> > > ---> The SimpleAclAuthorizer currently expects that, the Principal it
> > > fetches from the Session object is an instance of KafkaPrincipal. It
> then
> > > uses it compare with the 

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-13 Thread Bill Bejeck
Sorry for the late response, +1

On Mon, Feb 13, 2017 at 4:21 PM, Jun Rao  wrote:

> Hi, Guozhang,
>
> Thanks for the proposal. I made a pass of the wiki and had the following
> comments.
>
> 200. Message format:
> 200.1 MaxTimestampDelta: Does that need to be delta since it's always a
> fixed size in64?
> 200.2 The wiki says "At the end we still maintains a message-level CRC". Is
> that still valid?
> 200.3 In the ProducerRequest, do we need messageSet size?
> 200.4 One of the things that we may want to add in the future is KIP-82
> (per record header). It would be useful to think a bit how easy it is to
> support that with the new message format.
>
>
> 201. Configurations:
> 201.1 transaction.timeout.ms in the producer: It seems that it's missing
> in
> BeginTxnRequest? Also, what happens when the value is larger than
> max.transaction.timeout.ms on the broker?
> 201.2 For the internal transactional topic, do we need additional broker
> side configurations to control # of partitions, # of replicas, compression
> codec, segment size like the offset topic?
> 201.3 isolation.level: It says the default is "all", but there is no option
> for "all".
>
> Thanks,
>
> Jun
>
> On Wed, Feb 1, 2017 at 8:13 PM, Guozhang Wang  wrote:
>
> > Hi all,
> >
> > We would like to start the voting process for KIP-98. The KIP can be
> found
> > at
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> >
> > Discussion thread can be found here:
> >
> > http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> > DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
> >
> > Thanks,
> >
> > --
> > -- Guozhang
> >
>


[jira] [Updated] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-13 Thread Vipul Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vipul Singh updated KAFKA-4762:
---
Description: 
We were just recently hit by a weird error. 
Before going in any further, explaining of our service setup. we have a 
producer which produces messages not larger than 256 kb of messages( we have an 
explicit check about this on the producer side) and on the client side we have 
a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 

Recently our client started to see this error:

{quote}
org.apache.kafka.common.errors.RecordTooLargeException: There are some messages 
at [Partition=Offset]: {topic_name-0=9925056036} whose size is larger than the 
fetch size 524288 and hence cannot be ever returned. Increase the fetch size, 
or decrease the maximum message size the broker will allow.
{quote}

We tried consuming messages with another consumer, without any 
max.partition.fetch.bytes limit, and it consumed fine. The messages were small, 
and did not seem to be greater than 256 kb

We took a log dump, and the log size looked fine.
{quote}
mpresscodec: NoCompressionCodec crc: 2473548911 keysize: 8
offset: 9925056032 position: 191380053 isvalid: true payloadsize: 539 magic: 0 
compresscodec: NoCompressionCodec crc: 1656420267 keysize: 8
offset: 9925056033 position: 191380053 isvalid: true payloadsize: 1551 magic: 0 
compresscodec: NoCompressionCodec crc: 2398479758 keysize: 8
offset: 9925056034 position: 191380053 isvalid: true payloadsize: 1307 magic: 0 
compresscodec: NoCompressionCodec crc: 2845554215 keysize: 8
offset: 9925056035 position: 191380053 isvalid: true payloadsize: 1520 magic: 0 
compresscodec: NoCompressionCodec crc: 3106984195 keysize: 8
offset: 9925056036 position: 191713371 isvalid: true payloadsize: 1207 magic: 0 
compresscodec: NoCompressionCodec crc: 3462154435 keysize: 8
offset: 9925056037 position: 191713371 isvalid: true payloadsize: 418 magic: 0 
compresscodec: NoCompressionCodec crc: 1536701802 keysize: 8
offset: 9925056038 position: 191713371 isvalid: true payloadsize: 299 magic: 0 
compresscodec: NoCompressionCodec crc: 4112567543 keysize: 8
offset: 9925056039 position: 191713371 isvalid: true payloadsize: 1571 magic: 0 
compresscodec: NoCompressionCodec crc: 3696994307 keysize: 8
{quote}


Has anyone seen something similar? or any points to troubleshoot this further

Please Note: To overcome this issue, we deployed a new consumer, without this 
limit of max.partition.fetch.bytes, and it worked fine.


  was:
We were just recently hit by a weird error. 
Before going in any further, explaining of our service setup. we have a 
producer which produces messages not larger than 256 kb of messages( we have an 
explicit check about this on the producer side) and on the client side we have 
a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 

Recently our client started to see this error:

{quote}
org.apache.kafka.common.errors.RecordTooLargeException: There are some messages 
at [Partition=Offset]: {topic_name-0=9925080405} whose size is larger than the 
fetch size 524288 and hence cannot be ever returned. Increase the fetch size, 
or decrease the maximum message size the broker will allow.
{quote}

We tried consuming messages with another consumer, without any 
max.partition.fetch.bytes limit, and it consumed fine. The messages were small, 
and did not seem to be greater than 256 kb

We took a log dump, and the log size looked fine.

Has anyone seen something similar? or any points to troubleshoot this further

Please Note: To overcome this issue, we deployed a new consumer, without this 
limit of max.partition.fetch.bytes, and it worked fine.



> Consumer throwing RecordTooLargeException even when messages are not that 
> large
> ---
>
> Key: KAFKA-4762
> URL: https://issues.apache.org/jira/browse/KAFKA-4762
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vipul Singh
>
> We were just recently hit by a weird error. 
> Before going in any further, explaining of our service setup. we have a 
> producer which produces messages not larger than 256 kb of messages( we have 
> an explicit check about this on the producer side) and on the client side we 
> have a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 
> Recently our client started to see this error:
> {quote}
> org.apache.kafka.common.errors.RecordTooLargeException: There are some 
> messages at [Partition=Offset]: {topic_name-0=9925056036} whose size is 
> larger than the fetch size 524288 and hence cannot be ever returned. Increase 
> the fetch size, or decrease the maximum message size the broker will allow.
> {quote}
> We tried consuming messages with another consumer, without any 
> max.partition.fetch.bytes limit, and it 

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-13 Thread Jun Rao
Hi, Guozhang,

Thanks for the proposal. I made a pass of the wiki and had the following
comments.

200. Message format:
200.1 MaxTimestampDelta: Does that need to be delta since it's always a
fixed size in64?
200.2 The wiki says "At the end we still maintains a message-level CRC". Is
that still valid?
200.3 In the ProducerRequest, do we need messageSet size?
200.4 One of the things that we may want to add in the future is KIP-82
(per record header). It would be useful to think a bit how easy it is to
support that with the new message format.


201. Configurations:
201.1 transaction.timeout.ms in the producer: It seems that it's missing in
BeginTxnRequest? Also, what happens when the value is larger than
max.transaction.timeout.ms on the broker?
201.2 For the internal transactional topic, do we need additional broker
side configurations to control # of partitions, # of replicas, compression
codec, segment size like the offset topic?
201.3 isolation.level: It says the default is "all", but there is no option
for "all".

Thanks,

Jun

On Wed, Feb 1, 2017 at 8:13 PM, Guozhang Wang  wrote:

> Hi all,
>
> We would like to start the voting process for KIP-98. The KIP can be found
> at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>
> Discussion thread can be found here:
>
> http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
>
> Thanks,
>
> --
> -- Guozhang
>


[jira] [Updated] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-13 Thread Vipul Singh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vipul Singh updated KAFKA-4762:
---
Description: 
We were just recently hit by a weird error. 
Before going in any further, explaining of our service setup. we have a 
producer which produces messages not larger than 256 kb of messages( we have an 
explicit check about this on the producer side) and on the client side we have 
a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 

Recently our client started to see this error:

{quote}
org.apache.kafka.common.errors.RecordTooLargeException: There are some messages 
at [Partition=Offset]: {topic_name-0=9925080405} whose size is larger than the 
fetch size 524288 and hence cannot be ever returned. Increase the fetch size, 
or decrease the maximum message size the broker will allow.
{quote}

We tried consuming messages with another consumer, without any 
max.partition.fetch.bytes limit, and it consumed fine. The messages were small, 
and did not seem to be greater than 256 kb

We took a log dump, and the log size looked fine.

Has anyone seen something similar? or any points to troubleshoot this further

Please Note: To overcome this issue, we deployed a new consumer, without this 
limit of max.partition.fetch.bytes, and it worked fine.


  was:
We were just recently hit by a weird error. 
Before going in any further, explaining of our service setup. we have a 
producer which produces messages not larger than 256 kb of messages( we have an 
explicit check about this on the producer side) and on the client side we have 
a fetch limit of 512mb(max.partition.fetch.bytes is set to 524288 bytes) 

Recently our client started to see this error:

{quote}
org.apache.kafka.common.errors.RecordTooLargeException: There are some messages 
at [Partition=Offset]: {topic_name-0=9925080405} whose size is larger than the 
fetch size 524288 and hence cannot be ever returned. Increase the fetch size, 
or decrease the maximum message size the broker will allow.
{quote}

We tried consuming messages with another consumer, without any 
max.partition.fetch.bytes limit, and it consumed fine. The messages were small, 
and did not seem to be greater than 256 kb

We took a log dump, and the log size looked fine.

Has anyone seen something similar? or any points to troubleshoot this further

Please Note: To overcome this issue, we deployed a new consumer, without this 
limit of max.partition.fetch.bytes, and it worked fine.



> Consumer throwing RecordTooLargeException even when messages are not that 
> large
> ---
>
> Key: KAFKA-4762
> URL: https://issues.apache.org/jira/browse/KAFKA-4762
> Project: Kafka
>  Issue Type: Bug
>Reporter: Vipul Singh
>
> We were just recently hit by a weird error. 
> Before going in any further, explaining of our service setup. we have a 
> producer which produces messages not larger than 256 kb of messages( we have 
> an explicit check about this on the producer side) and on the client side we 
> have a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 
> Recently our client started to see this error:
> {quote}
> org.apache.kafka.common.errors.RecordTooLargeException: There are some 
> messages at [Partition=Offset]: {topic_name-0=9925080405} whose size is 
> larger than the fetch size 524288 and hence cannot be ever returned. Increase 
> the fetch size, or decrease the maximum message size the broker will allow.
> {quote}
> We tried consuming messages with another consumer, without any 
> max.partition.fetch.bytes limit, and it consumed fine. The messages were 
> small, and did not seem to be greater than 256 kb
> We took a log dump, and the log size looked fine.
> Has anyone seen something similar? or any points to troubleshoot this further
> Please Note: To overcome this issue, we deployed a new consumer, without this 
> limit of max.partition.fetch.bytes, and it worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Need suggestion on sending XML files via kafka

2017-02-13 Thread Prashanth Venkatesan
Hi Team,

I just started using Kafka. I have a usecase to send XML file or Document
object via Kafka topic using Java. Can you enlight me with the guidance
steps to achieve it??

Please apologize and ignore if I am posting to inappropriate mail address.

Thanks
Prashanth
+91-9677103475
India


[jira] [Updated] (KAFKA-3264) Mark the old Scala consumer and related classes as deprecated

2017-02-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-3264:
---
Status: Patch Available  (was: Open)

> Mark the old Scala consumer and related classes as deprecated
> -
>
> Key: KAFKA-3264
> URL: https://issues.apache.org/jira/browse/KAFKA-3264
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Grant Henke
>Assignee: Vahid Hashemian
> Fix For: 0.10.3.0
>
>
> Once the new consumer is out of beta, we should consider deprecating the old 
> Scala consumers to encourage use of the new consumer and facilitate the 
> removal of the old consumers.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2857) ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when describing a non-existent group before the offset topic is created

2017-02-13 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2857:
---
Status: Patch Available  (was: Open)

> ConsumerGroupCommand throws GroupCoordinatorNotAvailableException when 
> describing a non-existent group before the offset topic is created
> -
>
> Key: KAFKA-2857
> URL: https://issues.apache.org/jira/browse/KAFKA-2857
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Ismael Juma
>Assignee: Vahid Hashemian
>Priority: Minor
>
> If we describe a non-existing group before the offset topic is created, like 
> the following:
> {code}
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer 
> --describe --group 
> {code}
> We get the following error:
> {code}
> Error while executing consumer group command The group coordinator is not 
> available.
> org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The 
> group coordinator is not available.
> {code}
> The exception is thrown in the `adminClient.describeConsumerGroup` call. We 
> can't interpret this exception as meaning that the group doesn't exist 
> because it could also be thrown f all replicas for a offset topic partition 
> are down (as explained by Jun).
> Jun also suggested that we should distinguish if a coordinator is not 
> available from the case where a coordinator doesn't exist.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] 0.10.2.0 RC1

2017-02-13 Thread Guozhang Wang
@Ian, Nina

Thanks for the detailed description of your apps. A couple of follow-up
questions I have to help us further investigate your issue:

1. About the stack trace pasted before: is it tailing some warning logs
like "Could not create task ... Will retry" (i.e. it is part of that
warning log) or it is thrown as a exception all they way to the
thread's setUncaughtExceptionHandler,
in which case your code should have a `Unexpected Exception caught in
thread` error log entry and the thread will die.

2. Could you monitor the number of live stream threads during the runtime
(originally there should be 8 according to your configs) and see if the
count has decreased, meaning some threads have been dead?

3. When you reported that the process "just hang on attempting to rejoin",
how long have you observed it hanging before killed the process?

4. Could you try with fewer number of partitions, from 96 to 8 just for the
purpose of trouble shooting to see if this issue still exists?


Guozhang


On Mon, Feb 13, 2017 at 11:38 AM, Guozhang Wang  wrote:

> Thanks for reporting the JIRA Swen.
>
> Jason has a patch ready under KAFKA-4761 and I have reviewed it. You could
> try it out and see if it has fixed your issue.
>
> After this is merged in, we will need another RC.
>
>
> Guozhang
>
> On Mon, Feb 13, 2017 at 9:52 AM, Moczarski, Swen  kleinanzeigen.de> wrote:
>
>> +0 (non-binding)
>>
>> Thanks for compiling a new release candidate.
>>
>> I get an NullPointerException when setting batch.size=0 on producer
>> config. This worked before with 0.10.1.1.
>> See https://issues.apache.org/jira/browse/KAFKA-4761
>>
>> Regards,
>> Swen
>>
>> Am 2/10/17, 5:51 PM schrieb "Ewen Cheslack-Postava" :
>>
>> Hello Kafka users, developers and client-developers,
>>
>> This is RC1 for release of Apache Kafka 0.10.2.0.
>>
>> This is a minor version release of Apache Kafka. It includes 19 new
>> KIPs.
>> See the release notes and release plan (https://cwiki.apache.org/
>> confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A
>> few
>> feature highlights: SASL-SCRAM support, improved client compatibility
>> to
>> allow use of clients newer than the broker, session windows and global
>> tables in the Kafka Streams API, single message transforms in the
>> Kafka
>> Connect framework.
>>
>> Important note: in addition to the artifacts generated using JDK7 for
>> Scala
>> 2.10 and 2.11, this release also includes experimental artifacts built
>> using JDK8 for Scala 2.12.
>>
>> Important code changes since RC0 (non-docs, non system tests):
>>
>> * KAFKA-4728; KafkaConsumer#commitSync should copy its input
>> * KAFKA-4441; Monitoring incorrect during topic creation and deletion
>> * KAFKA-4734; Trim the time index on old segments
>> * KAFKA-4725; Stop leaking messages in produce request body when
>> requests
>> are delayed
>> * KAFKA-4716: Fix case when controller cannot be reached
>>
>> Release notes for the 0.10.2.0 release:
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/
>>
>> * Javadoc:
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
>>
>> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
>> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
>> e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
>>
>>
>> * Documentation:
>> http://kafka.apache.org/0102/documentation.html
>>
>> * Protocol:
>> http://kafka.apache.org/0102/protocol.html
>>
>> * Successful Jenkins builds for the 0.10.2 branch:
>> Unit/integration tests: https://builds.apache.org/job/
>> kafka-0.10.2-jdk7/74/
>> System tests: https://jenkins.confluent.io/j
>> ob/system-test-kafka-0.10.2/25/
>>
>> /**
>>
>> Thanks,
>> Ewen
>>
>>
>>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-13 Thread Colin McCabe
On Sun, Feb 12, 2017, at 09:21, Jay Kreps wrote:
> Hey Colin,
> 
> Thanks for the hard work on this. I know going back and forth on APIs is
> kind of frustrating but we're at the point where these things live long
> enough and are used by enough people that it is worth the pain. I'm sure
> it'll come down in the right place eventually. A couple things I've found
> helped in the past:
> 
>1. The burden of evidence needs to fall on the complicator. i.e. if
>person X thinks the api should be async they need to produce a set of
>common use cases that require this. Otherwise you are perpetually
>having to
>think "we might need x". I think it is good to have a rule of "simple
>until
>proven insufficient".
>2. Make sure we frame things for the intended audience. At this point
>our apis get used by a very broad set of Java engineers. This is a
>very
>different audience from our developer mailing list. These people code
>for a
>living not necessarily as a passion, and may not understand details of
>the
>internals of our system or even basic things like multi-threaded
>programming. I don't think this means we want to dumb things down, but
>rather try really hard to make things truly simple when possible.
> 
> Okay here were a couple of comments:
> 
>1. Conceptually what is a TopicContext? I think it means something
>like
>TopicAdmin? It is not literally context about Topics right? What is
>the
>relationship of Contexts to clients? Is there a threadsafety
>difference?
>Would be nice to not have to think about this, this is what I mean by
>"conceptual weight". We introduce a new concept that is a bit nebulous
>that
>I have to figure out to use what could be a simple api. I'm sure
>you've
>been through this experience before where you have these various
>objects
>and you're trying to figure out what they represent (the connection to
>the
>server? the information to create a connection? a request session?).

The intention was to provide some grouping of methods, and also a place
to put request parameters which were often set to defaults rather than
being explicitly set.  If it seems complex, we can certainly get rid of
it.

>2. We've tried to avoid the Impl naming convention. In general the
>rule
>has been if there is only going to be one implementation you don't
>need an
>interface. If there will be multiple, distinguish it from the others.
>The
>other clients follow this pattern: Producer, KafkaProducer,
>MockProducer;
>Consumer, KafkaConsumer, MockConsumer.

Good point.  Let's change the interface to KafkaAdminClient, and the
implementation to AdminClient.

>3. We generally don't use setters or getters as a naming convention. I
>personally think mutating the setting in place seems kind of like late
>90s
>Java style. I think it likely has thread-safety issues. i.e. even if
>it is
>volatile you may not get the value you just set if there is another
>thread... I actually really liked what you described as your original
>idea
>of having a single parameter object like CreateTopicRequest that holds
>all
>these parameters and defaults. This lets you evolve the api with all
>the
>various combinations of arguments without overloading insanity. After
>doing
>literally tens of thousands of remote APIs at LinkedIn we eventually
>converged on a rule, which is ultimately every remote api needs a
>single
>argument object you can add to over time and it must be batched. Which
>brings me to my next point...

Just to clarify, volatiles were never a part of the proposal.  I think
that context objects or request objects should be used by a single
thread at a time.

I'm not opposed to request objects, but I think they raise all the same
questions as context objects.  Basically, the thread-safety issues need
to be spelled out and understood by the user, and the user needs more
lines of code to make a request.  And there will be people trying to do
things like re-use request objects when they should not, and so forth.

>4. I agree batch apis are annoying but I suspect we'll end up adding
>one. Doing 1000 requests for 1000 operations if creating or deleting
>will
>be bad, right? This won't be the common case, but when you do it it
>will be
>a deal-breaker problem. I don't think we should try to fix this one
>behind
>the scenes.
>5. Are we going to do CompletableFuture (which requires java 8) or
>normal Future? Normal Future is utterly useless for most things other
>than
>just calling wait. If we can evolve in place from Future to
>CompletableFuture that is fantastic (we could do it for the producer
>too!).
>My belief was that this was binary incompatible but I actually don't
>know
>(obviously it's source compatible).

In my testing, 

[jira] [Created] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-13 Thread Vipul Singh (JIRA)
Vipul Singh created KAFKA-4762:
--

 Summary: Consumer throwing RecordTooLargeException even when 
messages are not that large
 Key: KAFKA-4762
 URL: https://issues.apache.org/jira/browse/KAFKA-4762
 Project: Kafka
  Issue Type: Bug
Reporter: Vipul Singh


We were just recently hit by a weird error. 
Before going in any further, explaining of our service setup. we have a 
producer which produces messages not larger than 256 kb of messages( we have an 
explicit check about this on the producer side) and on the client side we have 
a fetch limit of 512mb(max.partition.fetch.bytes is set to 524288 bytes) 

Recently our client started to see this error:

{quote}
org.apache.kafka.common.errors.RecordTooLargeException: There are some messages 
at [Partition=Offset]: {topic_name-0=9925080405} whose size is larger than the 
fetch size 524288 and hence cannot be ever returned. Increase the fetch size, 
or decrease the maximum message size the broker will allow.
{quote}

We tried consuming messages with another consumer, without any 
max.partition.fetch.bytes limit, and it consumed fine. The messages were small, 
and did not seem to be greater than 256 kb

We took a log dump, and the log size looked fine.

Has anyone seen something similar? or any points to troubleshoot this further

Please Note: To overcome this issue, we deployed a new consumer, without this 
limit of max.partition.fetch.bytes, and it worked fine.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] 0.10.2.0 RC1

2017-02-13 Thread Guozhang Wang
Thanks for reporting the JIRA Swen.

Jason has a patch ready under KAFKA-4761 and I have reviewed it. You could
try it out and see if it has fixed your issue.

After this is merged in, we will need another RC.


Guozhang

On Mon, Feb 13, 2017 at 9:52 AM, Moczarski, Swen <
smoczar...@ebay-kleinanzeigen.de> wrote:

> +0 (non-binding)
>
> Thanks for compiling a new release candidate.
>
> I get an NullPointerException when setting batch.size=0 on producer
> config. This worked before with 0.10.1.1.
> See https://issues.apache.org/jira/browse/KAFKA-4761
>
> Regards,
> Swen
>
> Am 2/10/17, 5:51 PM schrieb "Ewen Cheslack-Postava" :
>
> Hello Kafka users, developers and client-developers,
>
> This is RC1 for release of Apache Kafka 0.10.2.0.
>
> This is a minor version release of Apache Kafka. It includes 19 new
> KIPs.
> See the release notes and release plan (https://cwiki.apache.org/
> confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A
> few
> feature highlights: SASL-SCRAM support, improved client compatibility
> to
> allow use of clients newer than the broker, session windows and global
> tables in the Kafka Streams API, single message transforms in the Kafka
> Connect framework.
>
> Important note: in addition to the artifacts generated using JDK7 for
> Scala
> 2.10 and 2.11, this release also includes experimental artifacts built
> using JDK8 for Scala 2.12.
>
> Important code changes since RC0 (non-docs, non system tests):
>
> * KAFKA-4728; KafkaConsumer#commitSync should copy its input
> * KAFKA-4441; Monitoring incorrect during topic creation and deletion
> * KAFKA-4734; Trim the time index on old segments
> * KAFKA-4725; Stop leaking messages in produce request body when
> requests
> are delayed
> * KAFKA-4716: Fix case when controller cannot be reached
>
> Release notes for the 0.10.2.0 release:
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
>
> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
>
>
> * Documentation:
> http://kafka.apache.org/0102/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0102/protocol.html
>
> * Successful Jenkins builds for the 0.10.2 branch:
> Unit/integration tests: https://builds.apache.org/job/
> kafka-0.10.2-jdk7/74/
> System tests: https://jenkins.confluent.io/
> job/system-test-kafka-0.10.2/25/
>
> /**
>
> Thanks,
> Ewen
>
>
>


-- 
-- Guozhang


[jira] [Updated] (KAFKA-4761) NullPointerException if batch.size=0 for producer config

2017-02-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4761:
---
Priority: Blocker  (was: Minor)

> NullPointerException if batch.size=0 for producer config
> 
>
> Key: KAFKA-4761
> URL: https://issues.apache.org/jira/browse/KAFKA-4761
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
> Environment: jdk1.8.0_40
>Reporter: Swen Moczarski
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 0.10.2.0
>
> Attachments: KafkaProducerTest.java
>
>
> When setting {{batch.size}} to {{0}} for producer I get the following 
> exception when sending a record:
> {noformat}
> java.lang.NullPointerException
>   at org.apache.kafka.common.utils.Utils.notNull(Utils.java:315)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:197)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
>   at KafkaProducerTest.exposeNpeOnBatchSizeZero(KafkaProducerTest.java:21)
> {noformat}
> But documentation says:
> {quote}
>  ... (a batch size of zero will disable batching entirely).
> {quote}
> Please, see attached test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4761) NullPointerException if batch.size=0 for producer config

2017-02-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4761:
---
Fix Version/s: 0.10.2.0

> NullPointerException if batch.size=0 for producer config
> 
>
> Key: KAFKA-4761
> URL: https://issues.apache.org/jira/browse/KAFKA-4761
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
> Environment: jdk1.8.0_40
>Reporter: Swen Moczarski
>Assignee: Jason Gustafson
>Priority: Minor
> Fix For: 0.10.2.0
>
> Attachments: KafkaProducerTest.java
>
>
> When setting {{batch.size}} to {{0}} for producer I get the following 
> exception when sending a record:
> {noformat}
> java.lang.NullPointerException
>   at org.apache.kafka.common.utils.Utils.notNull(Utils.java:315)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:197)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
>   at KafkaProducerTest.exposeNpeOnBatchSizeZero(KafkaProducerTest.java:21)
> {noformat}
> But documentation says:
> {quote}
>  ... (a batch size of zero will disable batching entirely).
> {quote}
> Please, see attached test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4761) NullPointerException if batch.size=0 for producer config

2017-02-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-4761:
---
Status: Patch Available  (was: In Progress)

https://github.com/apache/kafka/pull/2545/files

> NullPointerException if batch.size=0 for producer config
> 
>
> Key: KAFKA-4761
> URL: https://issues.apache.org/jira/browse/KAFKA-4761
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
> Environment: jdk1.8.0_40
>Reporter: Swen Moczarski
>Assignee: Jason Gustafson
>Priority: Minor
> Attachments: KafkaProducerTest.java
>
>
> When setting {{batch.size}} to {{0}} for producer I get the following 
> exception when sending a record:
> {noformat}
> java.lang.NullPointerException
>   at org.apache.kafka.common.utils.Utils.notNull(Utils.java:315)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:197)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
>   at KafkaProducerTest.exposeNpeOnBatchSizeZero(KafkaProducerTest.java:21)
> {noformat}
> But documentation says:
> {quote}
>  ... (a batch size of zero will disable batching entirely).
> {quote}
> Please, see attached test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4671) Fix Streams window retention policy

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864247#comment-15864247
 ] 

ASF GitHub Bot commented on KAFKA-4671:
---

GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/2545

KAFKA-4671: Fix producer regression handling small or zero batch size



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-4761

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2545.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2545


commit d20a5d317ae66fa17e9ebf728a96db03e53970c6
Author: Jason Gustafson 
Date:   2017-02-13T19:15:38Z

KAFKA-4671: Fix producer regression handling small or zero batch size




> Fix Streams window retention policy
> ---
>
> Key: KAFKA-4671
> URL: https://issues.apache.org/jira/browse/KAFKA-4671
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
> Fix For: 0.10.2.0, 0.10.3.0
>
>
> Windows should consider window type and window spec to guarantee valid 
> minimum retention time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2545: KAFKA-4671: Fix producer regression handling small...

2017-02-13 Thread hachikuji
GitHub user hachikuji opened a pull request:

https://github.com/apache/kafka/pull/2545

KAFKA-4671: Fix producer regression handling small or zero batch size



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hachikuji/kafka KAFKA-4761

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2545.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2545


commit d20a5d317ae66fa17e9ebf728a96db03e53970c6
Author: Jason Gustafson 
Date:   2017-02-13T19:15:38Z

KAFKA-4671: Fix producer regression handling small or zero batch size




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (KAFKA-1379) Partition reassignment resets clock for time-based retention

2017-02-13 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864128#comment-15864128
 ] 

Andrew Olson edited comment on KAFKA-1379 at 2/13/17 6:43 PM:
--

[~hachikuji] Jason, could you confirm if this bug has been fixed? According to 
http://kafka.apache.org/documentation.html#upgrade_10_1_breaking it appears so.


was (Author: noslowerdna):
[~hachikuji] Jason, could you confirm if this bug has been fixed?

> Partition reassignment resets clock for time-based retention
> 
>
> Key: KAFKA-1379
> URL: https://issues.apache.org/jira/browse/KAFKA-1379
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Joel Koshy
>
> Since retention is driven off mod-times reassigned partitions will result in
> data that has been on a leader to be retained for another full retention
> cycle. E.g., if retention is seven days and you reassign partitions on the
> sixth day then those partitions will remain on the replicas for another
> seven days.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-13 Thread Mayuresh Gharat
Hi Jun,

Thanks for the review and comments. Please find the replies inline :

This is so that in the future, we can extend to types like group.
---> Yep, I did think the same. But since the SocketServer was always
creating User type, it wasn't actually used. If we go ahead with changes in
this KIP, we will give this power of creating different Principal types to
the PrincipalBuilder (which users can define there own). In that way Kafka
will not have to deal with handling this. So the Principal building and
Authorization will be opaque to Kafka which seems like an expected behavior.


Hmm, normally, the configurations you specify for plug-ins refer to those
needed to construct the plug-in object. So, it's kind of weird to use that
to call a method. For example, why can't principalBuilderService.rest.url
be passed in through the configure() method and the implementation can use
that to build principal. This way, there is only a single method to compute
the principal in a consistent way in the broker and in the kafka-acl tool.
> We can do that as well. But since the rest url is not related to the
Principal, it seems out of place to me to pass it every time we have to
create a Principal. I should replace "principalConfigs" with
"principalProperties".
I was trying to differentiate the configs/properties that are used to
create the PrincipalBuilder class and the Principal/Principals itself.


For LinkedIn's use case, do you actually use the kafka-acl tool? My
understanding is that LinkedIn does authorization through an external tool.
> For Linkedin's use case we don't actually use the kafka-acl tool
right now. As per the discussion that we had on
https://issues.apache.org/jira/browse/KAFKA-4454, we thought that it would
be good to make kafka-acl tool changes, to make it flexible and we might be
even able to use it in future.

It seems it's simpler if kafka-acl doesn't to need to understand the
principal builder. The tool does authorization based on a string name,
which is expected to match the principal name. So, I am wondering why the
tool needs to know the principal builder.
> If we don't make this change, I am not sure how clients/end users
will be able to use this tool if they have there own Authorizer that does
Authorization based on Principal, that has more information apart from name
and type.

What if we only make the following changes: pass the java principal in
session and in
SimpleAuthorizer, construct KafkaPrincipal from java principal name. Will
that work for LinkedIn?
> This can work for Linkedin but as explained above, it does not seem
like a complete design from open source point of view.

Thanks,

Mayuresh


On Thu, Feb 9, 2017 at 11:29 AM, Jun Rao  wrote:

> Hi, Mayuresh,
>
> Thanks for the reply. A few more comments below.
>
> On Wed, Feb 8, 2017 at 9:14 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > Thanks for the review. Please find the responses inline.
> >
> > 1. It seems the problem that you are trying to address is that java
> > principal returned from KafkaChannel may have additional fields than name
> > that are needed during authorization. Have you considered a customized
> > PrincipleBuilder that extracts all needed fields from java principal and
> > squeezes them as a json in the name of the returned principal? Then, the
> > authorizer can just parse the json and extract needed fields.
> > ---> Yes we had thought about this. We use a third party library that
> takes
> > in the passed in cert and creates the Principal. This Principal is then
> > used by the library to make the decision (ALLOW/DENY) when we call it in
> > the Authorizer. It does not have an API to create the Principal from a
> > String. If it did support, still we would have to be aware of the
> internal
> > details of the library, like the field values it creates from the certs,
> > defaults and so on.
> >
> > 2. Could you explain how the default authorizer works now? Currently, the
> > code just compares the two principal objects. Are we converting the java
> > principal to a KafkaPrincipal there?
> > ---> The SimpleAclAuthorizer currently expects that, the Principal it
> > fetches from the Session object is an instance of KafkaPrincipal. It then
> > uses it compare with the KafkaPrincipal extracted from the stored ACLs.
> In
> > this case, we can construct the KafkaPrincipal object on the fly by using
> > the name of the Principal as follows :
> >
> > *val principal = session.principal*
> > *val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE,
> > principal.getName)*
> > I was also planning to get rid of the principalType field in
> > KafkaPrincipal as
> > it is always set to *"*User*"* in the SocketServer currently. After this
> > KIP, it will no longer be used in SocketServer. But to maintain backwards
> > compatibility of kafka-acls.sh, I preserved it.
> >
> >
> >
> This is so that in the future, we can extend to types like 

Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-13 Thread Dong Lin
+1 (non-binding)

On Mon, Feb 13, 2017 at 10:21 AM, Harsha Chintalapani 
wrote:

> +1.
> -Harsha
>
> On Fri, Feb 10, 2017 at 11:12 PM Manikumar 
> wrote:
>
> > Yes, owners and the renewers can always describe their own tokens.
> Updated
> > the KIP.
> >
> > On Sat, Feb 11, 2017 at 3:12 AM, Jun Rao  wrote:
> >
> > > Hi, Mani,
> > >
> > > Thanks for the update. Just a minor comment below. Otherwise, +1 from
> me.
> > >
> > >
> > > >
> > > > >
> > > > > 116. Could you document the ACL rules associated with those new
> > > requests?
> > > > > For example, do we allow any one to create, delete, describe
> > delegation
> > > > > tokens?
> > > > >
> > > > >
> > > > Currently we only allow a owner to create delegation token for that
> > owner
> > > > only.
> > > > Any thing the owner has permission to do, delegation tokens should be
> > > > allowed to do as well. We can also check renew and expire requests
> are
> > > > coming
> > > > from owner or renewers of the token. So we may not need ACLs for
> > > > create/renew/expire requests.
> > > >
> > > > For describe, we can add DESCRIBE operation on TOKEN Resource. In
> > future,
> > > > when we extend
> > > > the support to allow a user to acquire delegation tokens for other
> > users,
> > > > then we can enable
> > > > CREATE/DELETE operations. Updated the KIP.
> > > >
> > > >
> > > This sounds good. I guess the owner and the renewer can always describe
> > > their own tokens?
> > >
> > > Jun
> > >
> >
>


Re: [VOTE] KIP-48 Support for delegation tokens as an authentication mechanism

2017-02-13 Thread Harsha Chintalapani
+1.
-Harsha

On Fri, Feb 10, 2017 at 11:12 PM Manikumar 
wrote:

> Yes, owners and the renewers can always describe their own tokens. Updated
> the KIP.
>
> On Sat, Feb 11, 2017 at 3:12 AM, Jun Rao  wrote:
>
> > Hi, Mani,
> >
> > Thanks for the update. Just a minor comment below. Otherwise, +1 from me.
> >
> >
> > >
> > > >
> > > > 116. Could you document the ACL rules associated with those new
> > requests?
> > > > For example, do we allow any one to create, delete, describe
> delegation
> > > > tokens?
> > > >
> > > >
> > > Currently we only allow a owner to create delegation token for that
> owner
> > > only.
> > > Any thing the owner has permission to do, delegation tokens should be
> > > allowed to do as well. We can also check renew and expire requests are
> > > coming
> > > from owner or renewers of the token. So we may not need ACLs for
> > > create/renew/expire requests.
> > >
> > > For describe, we can add DESCRIBE operation on TOKEN Resource. In
> future,
> > > when we extend
> > > the support to allow a user to acquire delegation tokens for other
> users,
> > > then we can enable
> > > CREATE/DELETE operations. Updated the KIP.
> > >
> > >
> > This sounds good. I guess the owner and the renewer can always describe
> > their own tokens?
> >
> > Jun
> >
>


[jira] [Commented] (KAFKA-1379) Partition reassignment resets clock for time-based retention

2017-02-13 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15864128#comment-15864128
 ] 

Andrew Olson commented on KAFKA-1379:
-

[~hachikuji] Jason, could you confirm if this bug has been fixed?

> Partition reassignment resets clock for time-based retention
> 
>
> Key: KAFKA-1379
> URL: https://issues.apache.org/jira/browse/KAFKA-1379
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Reporter: Joel Koshy
>
> Since retention is driven off mod-times reassigned partitions will result in
> data that has been on a leader to be retained for another full retention
> cycle. E.g., if retention is seven days and you reassign partitions on the
> sixth day then those partitions will remain on the replicas for another
> seven days.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-4761) NullPointerException if batch.size=0 for producer config

2017-02-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4761 started by Jason Gustafson.
--
> NullPointerException if batch.size=0 for producer config
> 
>
> Key: KAFKA-4761
> URL: https://issues.apache.org/jira/browse/KAFKA-4761
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
> Environment: jdk1.8.0_40
>Reporter: Swen Moczarski
>Assignee: Jason Gustafson
>Priority: Minor
> Attachments: KafkaProducerTest.java
>
>
> When setting {{batch.size}} to {{0}} for producer I get the following 
> exception when sending a record:
> {noformat}
> java.lang.NullPointerException
>   at org.apache.kafka.common.utils.Utils.notNull(Utils.java:315)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:197)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
>   at KafkaProducerTest.exposeNpeOnBatchSizeZero(KafkaProducerTest.java:21)
> {noformat}
> But documentation says:
> {quote}
>  ... (a batch size of zero will disable batching entirely).
> {quote}
> Please, see attached test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] 0.10.2.0 RC1

2017-02-13 Thread Moczarski, Swen
+0 (non-binding)

Thanks for compiling a new release candidate.

I get an NullPointerException when setting batch.size=0 on producer config. 
This worked before with 0.10.1.1.
See https://issues.apache.org/jira/browse/KAFKA-4761

Regards,
Swen

Am 2/10/17, 5:51 PM schrieb "Ewen Cheslack-Postava" :

Hello Kafka users, developers and client-developers,

This is RC1 for release of Apache Kafka 0.10.2.0.

This is a minor version release of Apache Kafka. It includes 19 new KIPs.
See the release notes and release plan (https://cwiki.apache.org/
confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A few
feature highlights: SASL-SCRAM support, improved client compatibility to
allow use of clients newer than the broker, session windows and global
tables in the Kafka Streams API, single message transforms in the Kafka
Connect framework.

Important note: in addition to the artifacts generated using JDK7 for Scala
2.10 and 2.11, this release also includes experimental artifacts built
using JDK8 for Scala 2.12.

Important code changes since RC0 (non-docs, non system tests):

* KAFKA-4728; KafkaConsumer#commitSync should copy its input
* KAFKA-4441; Monitoring incorrect during topic creation and deletion
* KAFKA-4734; Trim the time index on old segments
* KAFKA-4725; Stop leaking messages in produce request body when requests
are delayed
* KAFKA-4716: Fix case when controller cannot be reached

Release notes for the 0.10.2.0 release:
http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/RELEASE_NOTES.html

*** Please download, test and vote by Monday, Feb 13, 5pm PT ***

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/

* Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:

https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=e825b7994bf8c8c4871d1e0973e287e6d31c7ae4


* Documentation:
http://kafka.apache.org/0102/documentation.html

* Protocol:
http://kafka.apache.org/0102/protocol.html

* Successful Jenkins builds for the 0.10.2 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-0.10.2-jdk7/74/
System tests: https://jenkins.confluent.io/job/system-test-kafka-0.10.2/25/

/**

Thanks,
Ewen




[jira] [Assigned] (KAFKA-4761) NullPointerException if batch.size=0 for producer config

2017-02-13 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-4761:
--

Assignee: Jason Gustafson

> NullPointerException if batch.size=0 for producer config
> 
>
> Key: KAFKA-4761
> URL: https://issues.apache.org/jira/browse/KAFKA-4761
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.0
> Environment: jdk1.8.0_40
>Reporter: Swen Moczarski
>Assignee: Jason Gustafson
>Priority: Minor
> Attachments: KafkaProducerTest.java
>
>
> When setting {{batch.size}} to {{0}} for producer I get the following 
> exception when sending a record:
> {noformat}
> java.lang.NullPointerException
>   at org.apache.kafka.common.utils.Utils.notNull(Utils.java:315)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:197)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
>   at KafkaProducerTest.exposeNpeOnBatchSizeZero(KafkaProducerTest.java:21)
> {noformat}
> But documentation says:
> {quote}
>  ... (a batch size of zero will disable batching entirely).
> {quote}
> Please, see attached test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Correct prefetching of data to KTable-like structure on application startup

2017-02-13 Thread Matthias J. Sax
Jan,

brokers with version 0.10.1 or higher allow to set both topic cleanup
policies in combination:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist

However, this will only delete data in you changelog topic but not in
your RocksDB -- if you want to get data delete in RocksDB, you would
need to send tombstone messages for those keys. It's kinda tricky to get
this done.

An "brute force" alternative would be, stop the application, delete the
local state directory, and restart. This will force Streams to recreate
the RocksDB files from the changelog and thus only loading keys that got
not deleted. But this is of course a quite expensive approach and you
should be very careful about using it.


-Matthias


On 2/13/17 12:25 AM, Jan Lukavský wrote:
> Hi Michael,
> 
> sorry for my late answer. Configuring the topic as you suggest is one
> option (and I will configure it that way), but I wanted to combine the
> two data retention mechanisms (if possible). I would like to use log
> compaction, so that I will always get at least the last message for
> given key, but I would also like to use the classical temporal data
> retention, which would function as a sort of TTL for the keys - if a key
> doesn't get an update for the configured period of time, if could be
> removed. That way I could ensure that out-dated keys could be removed.
> 
> Is there any other option for this? And can kafka be configured this way?
> 
> Best,
> 
>  Jan
> 
> On 02/09/2017 12:08 PM, Michael Noll wrote:
>> Jan,
>>
>>>   - if I don't send any data to a kafka partition for a period longer
>>> then
>> the data retention interval, then all data from the partition is wiped
>> out
>>
>> If I interpret your first and second message in this email thread
>> correctly, then you are talking only about your "state topic" here, i.e.
>> the topic that you read into a KTable.  You should configure this
>> topic to
>> use log compaction, which will ensure that the latest value for a
>> given key
>> will never be wiped.  So even if you don't send any data to a Kafka
>> partition of this (now log-compacted) "state topic" for a long period of
>> time, you'd always have access to (at least) the latest value for
>> every key.
>>
>> Would that help?
>>
>> -Michael
>>
>>
>>
>>
>>
>> On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský  wrote:
>>
>>> Hi Matthias,
>>>
>>> first of all, thanks for your answer. Sorry if I didn't explain the
>>> problem well, I didn't want to dig too much into detail to focus on the
>>> important and maybe the result was not clear.
>>>
>>> My fault, I will try to explain in again. I have two KafkaConsumers
>>> in two
>>> separate threads consuming from two topics - let's call the first one
>>> "stream topic" (processed like KStream)
>>>
>>> and the second one "state topic" (processed like KTable). The state
>>> topic
>>> carries a persistent data that I need in order to process the stream
>>> topic,
>>> so I need to cache the state topic
>>>
>>> locally before starting consumption of the stream topic. When the
>>> application is running normally, there seems to be no issue with this,
>>>
>>> because the state topic is updated asynchronously and I use internal
>>> locks
>>> to synchronize the processing inside the application. So far,
>>> everything is
>>> fine.
>>>
>>>
>>> The problem might arise when the application starts - then I do the
>>> following:
>>>
>>>   - lock processing of the stream topic (because I don't have the state
>>> topic cached)
>>>
>>>   - read the current offset N from the state topic (which gives me
>>> offsets
>>> of a message that should be expected next, that is message that has
>>> not yet
>>> been written)
>>>
>>>   - reset offset of the state topic to beginning and read it until I
>>> read
>>> offset N - 1, which tells me that I have cached all the data I need to
>>> process the stream topic, so I unlock the stream processing and continue
>>>
>>> All this works well, except for some very rare situation, when the
>>> following happens (as I understand it, maybe here I am making some
>>> mistake):
>>>
>>>   - for a long period of time there is no update to (at least single
>>> partition) of the state topic
>>>
>>>   - when I try to cache the state topic during startup as explained
>>> above,
>>> it might never finish, because I will never get a message with offset
>>> N - 1
>>> - that is because I will not get any message at all, because all of the
>>> data has been wiped out
>>>
>>>   - because I don't know if I get all the data from the state topic, I
>>> cannot start processing the stream topic and the whole application is
>>> stuck, until first message arrives into all partition of the state topic
>>> (which might even never happen)
>>>
>>>   - I might use some sort of timeout to handle this, but this could be
>>> dangerous, relying on KafkaConsumer.poll() returning empty records
>>> sounds
>>> to me a little fragile too (because 

Re: KafkaStream Run on 0.9.0.1

2017-02-13 Thread Matthias J. Sax
No.

If you want to use Kafka Streams, you need brokers with the same or
higher version number. Thus, you need at least 0.10 brokers.

This restriction will be relaxed with upcoming 0.10.2 release that is
the first release being broker backward compatible. Thus, you can run a
0.10.2 Streams app also on 0.10.1 brokers (ie, the backward
compatibility is also limited...)


-Matthias


On 2/12/17 11:00 PM, Abhinav Patil wrote:
> Hi,
> 
> Is kafka stream run on 0.9.0.1 ?
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Assigned] (KAFKA-4494) Significant startup delays in KStreams app

2017-02-13 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy reassigned KAFKA-4494:
-

Assignee: Damian Guy

> Significant startup delays in KStreams app
> --
>
> Key: KAFKA-4494
> URL: https://issues.apache.org/jira/browse/KAFKA-4494
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: AWS Linux ami, mac os
>Reporter: j yeargers
>Assignee: Damian Guy
>  Labels: performance
>
> Often starting a KStreams based app results in significant (5-10 minutes) 
> delay before processing of stream begins. 
> Sample debug output: 
> https://gist.github.com/jyeargers/e8398fb353d67397f99148bc970479ee
> Topology in question: stream -> map -> groupbykey.aggregate -> print
> Stream is JSON.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (KAFKA-4494) Significant startup delays in KStreams app

2017-02-13 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on KAFKA-4494 started by Damian Guy.
-
> Significant startup delays in KStreams app
> --
>
> Key: KAFKA-4494
> URL: https://issues.apache.org/jira/browse/KAFKA-4494
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
> Environment: AWS Linux ami, mac os
>Reporter: j yeargers
>Assignee: Damian Guy
>  Labels: performance
>
> Often starting a KStreams based app results in significant (5-10 minutes) 
> delay before processing of stream begins. 
> Sample debug output: 
> https://gist.github.com/jyeargers/e8398fb353d67397f99148bc970479ee
> Topology in question: stream -> map -> groupbykey.aggregate -> print
> Stream is JSON.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4761) NullPointerException if batch.size=0 for producer config

2017-02-13 Thread Swen Moczarski (JIRA)
Swen Moczarski created KAFKA-4761:
-

 Summary: NullPointerException if batch.size=0 for producer config
 Key: KAFKA-4761
 URL: https://issues.apache.org/jira/browse/KAFKA-4761
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.2.0
 Environment: jdk1.8.0_40
Reporter: Swen Moczarski
Priority: Minor
 Attachments: KafkaProducerTest.java

When setting {{batch.size}} to {{0}} for producer I get the following exception 
when sending a record:

{noformat}
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.notNull(Utils.java:315)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:197)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:478)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:440)
at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:360)
at KafkaProducerTest.exposeNpeOnBatchSizeZero(KafkaProducerTest.java:21)
{noformat}

But documentation says:
{quote}
 ... (a batch size of zero will disable batching entirely).
{quote}

Please, see attached test.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


KafkaStream Run on 0.9.0.1

2017-02-13 Thread Abhinav Patil
Hi,

Is kafka stream run on 0.9.0.1 ?

-- 
*Regards,*
*Abhinav PATIL*
Mobile: +91-902812760


Re: [VOTE] 0.10.2.0 RC1

2017-02-13 Thread Nina Hanzlikova
Hi,

I am a colleague of Ian's. We use the following processing pipeline in
stream app he mentions:
https://github.com/zalando-incubator/pipeline-backbone

The streams are built using:

object Run extends App {
  // ...

  private val latch = new CountDownLatch(1)

  private val builder = {
val b = new KStreamBuilder()
TextPipeline.initializePipelineKStream(
  b,
  Serdes.serdeFrom(new TextPipelineSerializer(), new
TextPipelineDeserializer()),
  Some(latch)
)
b
  }

  private val streams = {
val s = TextPipeline.createKStreams(builder, props)
s.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  override def uncaughtException(t: Thread, e: Throwable): Unit = {
log.error(s"Unexpected Exception caught in thread [${t.getName}]:", e)
  }
})
s
  }

  sys.addShutdownHook(shutdown())

  try {


log.info("Starting...")
streams.start()
loopForever()
  } catch {
case NonFatal(e) => log.error("Exception starting kafka streams", e)
  } finally {
streams.close()
  }

  def shutdown(): Unit = {
log.warn("Shutting down the system")
streams.close()
  }

}


TextPipeline.initializePipelineKStream(builder: KStreamBuilder,
tpdSerde: Serde[TextPipelineDatum], latch: Option[CountDownLatch]) = {
  val coord = new KafkaStreamsBackboneCoordinator(backbone, latch)
 // from the linked pipeline
  val kstream = builder.stream(new StringSerde, tpdSerde, INPUT_TOPIC)
  kstream.transformValues[Xor[TransformationPipelineFailure,
TextPipelineDatum]](coord)
.flatMapValues(new
KafkaStreamValidDatumValueMapper[TextPipelineDatum])
  // from the linked pipeline
}

TextPipeline.createKStreams(builder: KStreamBuilder, props:
Properties): KafkaStreams =
  new KafkaStreams(builder, new StreamsConfig(props))


The stream processing being done is quite slow and intensive (can take
of the order of minutes). We also have a large number of partitions on
our input topic (96), with replication factor 3. We have dropped our
max.poll.records to 10 and still see this problem.

Thanks for your help and best regards,

Nina

On 13 February 2017 at 08:09, Eno Thereska  wrote:
> +1 (non binding)
>
> Checked streams. Verified that stream tests work and examples off 
> confluentinc/examples/kafka-streams work.
>
> Thanks
> Eno
>
>> On 10 Feb 2017, at 16:51, Ewen Cheslack-Postava  wrote:
>>
>> Hello Kafka users, developers and client-developers,
>>
>> This is RC1 for release of Apache Kafka 0.10.2.0.
>>
>> This is a minor version release of Apache Kafka. It includes 19 new KIPs.
>> See the release notes and release plan (https://cwiki.apache.org/
>> confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A few
>> feature highlights: SASL-SCRAM support, improved client compatibility to
>> allow use of clients newer than the broker, session windows and global
>> tables in the Kafka Streams API, single message transforms in the Kafka
>> Connect framework.
>>
>> Important note: in addition to the artifacts generated using JDK7 for Scala
>> 2.10 and 2.11, this release also includes experimental artifacts built
>> using JDK8 for Scala 2.12.
>>
>> Important code changes since RC0 (non-docs, non system tests):
>>
>> * KAFKA-4728; KafkaConsumer#commitSync should copy its input
>> * KAFKA-4441; Monitoring incorrect during topic creation and deletion
>> * KAFKA-4734; Trim the time index on old segments
>> * KAFKA-4725; Stop leaking messages in produce request body when requests
>> are delayed
>> * KAFKA-4716: Fix case when controller cannot be reached
>>
>> Release notes for the 0.10.2.0 release:
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/
>>
>> * Javadoc:
>> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
>>
>> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
>> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
>>
>>
>> * Documentation:
>> http://kafka.apache.org/0102/documentation.html
>>
>> * Protocol:
>> http://kafka.apache.org/0102/protocol.html
>>
>> * Successful Jenkins builds for the 0.10.2 branch:
>> Unit/integration tests: https://builds.apache.org/job/kafka-0.10.2-jdk7/74/
>> System tests: https://jenkins.confluent.io/job/system-test-kafka-0.10.2/25/
>>
>> /**
>>
>> Thanks,
>> Ewen
>



-- 
By monitor glow
On conference paper due
A grad student naps.
 -- PhD Haiku


[jira] [Updated] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Gański updated KAFKA-4760:
-
Description: 
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

Observed situation: [^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep these variables during MirrorMaker restart.


  was:
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

Observed situation: [^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variables during MirrorMaker restart.



>  Restarting one MirrorMaker causes a global pause in mirroring.
> ---
>
> Key: KAFKA-4760
> URL: https://issues.apache.org/jira/browse/KAFKA-4760
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: Tomasz Gański
> Attachments: 1_mm_stop.png
>
>
> When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
> stop working and lead to big consumer lag. 
> Observed situation: [^1_mm_stop.png]
> It becoming huge problem when you want keep system stable and guarantee that 
> crucial messages come as soon as possible. In this case each deployment or 
> setting change is risky. 
> It would be grate if MirrorMaker could rejoin to previous assigned partitions 
> and not caused consumer group rebalance.
> The rejoins to existing consumer group is based on *memberId* and *group 
> generation id*.  I would nice if it would be possible to make them 
> persistence and keep these variables during MirrorMaker restart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Gański updated KAFKA-4760:
-
Description: 
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

Observed situation: [^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variables during MirrorMaker restart.


  was:
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

Observed situation: [^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.



>  Restarting one MirrorMaker causes a global pause in mirroring.
> ---
>
> Key: KAFKA-4760
> URL: https://issues.apache.org/jira/browse/KAFKA-4760
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: Tomasz Gański
> Attachments: 1_mm_stop.png
>
>
> When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
> stop working and lead to big consumer lag. 
> Observed situation: [^1_mm_stop.png]
> It becoming huge problem when you want keep system stable and guarantee that 
> crucial messages come as soon as possible. In this case each deployment or 
> setting change is risky. 
> It would be grate if MirrorMaker could rejoin to previous assigned partitions 
> and not caused consumer group rebalance.
> The rejoins to existing consumer group is based on *memberId* and *group 
> generation id*.  I would nice if it would be possible to make them 
> persistence and keep this variables during MirrorMaker restart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Gański updated KAFKA-4760:
-
Affects Version/s: 0.10.1.1
  Component/s: tools

>  Restarting one MirrorMaker causes a global pause in mirroring.
> ---
>
> Key: KAFKA-4760
> URL: https://issues.apache.org/jira/browse/KAFKA-4760
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.10.1.1
>Reporter: Tomasz Gański
> Attachments: 1_mm_stop.png
>
>
> When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
> stop working and lead to big consumer lag. 
> Observed situation: [^1_mm_stop.png]
> It becoming huge problem when you want keep system stable and guarantee that 
> crucial messages come as soon as possible. In this case each deployment or 
> setting change is risky. 
> It would be grate if MirrorMaker could rejoin to previous assigned partitions 
> and not caused consumer group rebalance.
> The rejoins to existing consumer group is based on *memberId* and *group 
> generation id*.  I would nice if it would be possible to make them 
> persistence and keep this variable during MirrorMaker restart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Gański updated KAFKA-4760:
-
Description: 
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

Observed situation: [^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.


  was:
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png! 
[^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.



>  Restarting one MirrorMaker causes a global pause in mirroring.
> ---
>
> Key: KAFKA-4760
> URL: https://issues.apache.org/jira/browse/KAFKA-4760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tomasz Gański
> Attachments: 1_mm_stop.png
>
>
> When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
> stop working and lead to big consumer lag. 
> Observed situation: [^1_mm_stop.png]
> It becoming huge problem when you want keep system stable and guarantee that 
> crucial messages come as soon as possible. In this case each deployment or 
> setting change is risky. 
> It would be grate if MirrorMaker could rejoin to previous assigned partitions 
> and not caused consumer group rebalance.
> The rejoins to existing consumer group is based on *memberId* and *group 
> generation id*.  I would nice if it would be possible to make them 
> persistence and keep this variable during MirrorMaker restart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Gański updated KAFKA-4760:
-
Description: 
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png! 
[^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.


  was:
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png|thumbnail! 
[^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.



>  Restarting one MirrorMaker causes a global pause in mirroring.
> ---
>
> Key: KAFKA-4760
> URL: https://issues.apache.org/jira/browse/KAFKA-4760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tomasz Gański
> Attachments: 1_mm_stop.png
>
>
> When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
> stop working and lead to big consumer lag. 
>  !1_mm_stop.png! 
> [^1_mm_stop.png]
> It becoming huge problem when you want keep system stable and guarantee that 
> crucial messages come as soon as possible. In this case each deployment or 
> setting change is risky. 
> It would be grate if MirrorMaker could rejoin to previous assigned partitions 
> and not caused consumer group rebalance.
> The rejoins to existing consumer group is based on *memberId* and *group 
> generation id*.  I would nice if it would be possible to make them 
> persistence and keep this variable during MirrorMaker restart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Gański updated KAFKA-4760:
-
Description: 
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png|thumbnail! 
[^1_mm_stop.png]

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.


  was:
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png|thumbnail! 

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.



>  Restarting one MirrorMaker causes a global pause in mirroring.
> ---
>
> Key: KAFKA-4760
> URL: https://issues.apache.org/jira/browse/KAFKA-4760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tomasz Gański
> Attachments: 1_mm_stop.png
>
>
> When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
> stop working and lead to big consumer lag. 
>  !1_mm_stop.png|thumbnail! 
> [^1_mm_stop.png]
> It becoming huge problem when you want keep system stable and guarantee that 
> crucial messages come as soon as possible. In this case each deployment or 
> setting change is risky. 
> It would be grate if MirrorMaker could rejoin to previous assigned partitions 
> and not caused consumer group rebalance.
> The rejoins to existing consumer group is based on *memberId* and *group 
> generation id*.  I would nice if it would be possible to make them 
> persistence and keep this variable during MirrorMaker restart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Gański updated KAFKA-4760:
-
Description: 
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png! 

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.


  was:
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png|thumbnail! 

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.



>  Restarting one MirrorMaker causes a global pause in mirroring.
> ---
>
> Key: KAFKA-4760
> URL: https://issues.apache.org/jira/browse/KAFKA-4760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tomasz Gański
> Attachments: 1_mm_stop.png
>
>
> When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
> stop working and lead to big consumer lag. 
>  !1_mm_stop.png! 
> It becoming huge problem when you want keep system stable and guarantee that 
> crucial messages come as soon as possible. In this case each deployment or 
> setting change is risky. 
> It would be grate if MirrorMaker could rejoin to previous assigned partitions 
> and not caused consumer group rebalance.
> The rejoins to existing consumer group is based on *memberId* and *group 
> generation id*.  I would nice if it would be possible to make them 
> persistence and keep this variable during MirrorMaker restart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-4760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tomasz Gański updated KAFKA-4760:
-
Description: 
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png|thumbnail! 

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.


  was:
When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png! 

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.



>  Restarting one MirrorMaker causes a global pause in mirroring.
> ---
>
> Key: KAFKA-4760
> URL: https://issues.apache.org/jira/browse/KAFKA-4760
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tomasz Gański
> Attachments: 1_mm_stop.png
>
>
> When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
> stop working and lead to big consumer lag. 
>  !1_mm_stop.png|thumbnail! 
> It becoming huge problem when you want keep system stable and guarantee that 
> crucial messages come as soon as possible. In this case each deployment or 
> setting change is risky. 
> It would be grate if MirrorMaker could rejoin to previous assigned partitions 
> and not caused consumer group rebalance.
> The rejoins to existing consumer group is based on *memberId* and *group 
> generation id*.  I would nice if it would be possible to make them 
> persistence and keep this variable during MirrorMaker restart.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4760) Restarting one MirrorMaker causes a global pause in mirroring.

2017-02-13 Thread JIRA
Tomasz Gański created KAFKA-4760:


 Summary:  Restarting one MirrorMaker causes a global pause in 
mirroring.
 Key: KAFKA-4760
 URL: https://issues.apache.org/jira/browse/KAFKA-4760
 Project: Kafka
  Issue Type: Improvement
Reporter: Tomasz Gański
 Attachments: 1_mm_stop.png

When 1 MirrorMaker is restarted in consumer group, the whole consumer group 
stop working and lead to big consumer lag. 

 !1_mm_stop.png|thumbnail! 

It becoming huge problem when you want keep system stable and guarantee that 
crucial messages come as soon as possible. In this case each deployment or 
setting change is risky. 

It would be grate if MirrorMaker could rejoin to previous assigned partitions 
and not caused consumer group rebalance.

The rejoins to existing consumer group is based on *memberId* and *group 
generation id*.  I would nice if it would be possible to make them persistence 
and keep this variable during MirrorMaker restart.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2544: KFKA-4340: update the upgrade.html

2017-02-13 Thread becketqin
GitHub user becketqin opened a pull request:

https://github.com/apache/kafka/pull/2544

KFKA-4340: update the upgrade.html



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/becketqin/kafka KAFKA-4340_follow_up

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2544.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2544


commit 03a3beedfa3063574586fa337463ee792ce6d17e
Author: Jiangjie Qin 
Date:   2017-02-13T10:10:34Z

Follow up patch for KAFKA-4340 to update the upgrade.html




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Correct prefetching of data to KTable-like structure on application startup

2017-02-13 Thread Jan Lukavský

Hi Michael,

sorry for my late answer. Configuring the topic as you suggest is one 
option (and I will configure it that way), but I wanted to combine the 
two data retention mechanisms (if possible). I would like to use log 
compaction, so that I will always get at least the last message for 
given key, but I would also like to use the classical temporal data 
retention, which would function as a sort of TTL for the keys - if a key 
doesn't get an update for the configured period of time, if could be 
removed. That way I could ensure that out-dated keys could be removed.


Is there any other option for this? And can kafka be configured this way?

Best,

 Jan

On 02/09/2017 12:08 PM, Michael Noll wrote:

Jan,


  - if I don't send any data to a kafka partition for a period longer then

the data retention interval, then all data from the partition is wiped out

If I interpret your first and second message in this email thread
correctly, then you are talking only about your "state topic" here, i.e.
the topic that you read into a KTable.  You should configure this topic to
use log compaction, which will ensure that the latest value for a given key
will never be wiped.  So even if you don't send any data to a Kafka
partition of this (now log-compacted) "state topic" for a long period of
time, you'd always have access to (at least) the latest value for every key.

Would that help?

-Michael





On Thu, Feb 9, 2017 at 10:16 AM, Jan Lukavský  wrote:


Hi Matthias,

first of all, thanks for your answer. Sorry if I didn't explain the
problem well, I didn't want to dig too much into detail to focus on the
important and maybe the result was not clear.

My fault, I will try to explain in again. I have two KafkaConsumers in two
separate threads consuming from two topics - let's call the first one
"stream topic" (processed like KStream)

and the second one "state topic" (processed like KTable). The state topic
carries a persistent data that I need in order to process the stream topic,
so I need to cache the state topic

locally before starting consumption of the stream topic. When the
application is running normally, there seems to be no issue with this,

because the state topic is updated asynchronously and I use internal locks
to synchronize the processing inside the application. So far, everything is
fine.


The problem might arise when the application starts - then I do the
following:

  - lock processing of the stream topic (because I don't have the state
topic cached)

  - read the current offset N from the state topic (which gives me offsets
of a message that should be expected next, that is message that has not yet
been written)

  - reset offset of the state topic to beginning and read it until I read
offset N - 1, which tells me that I have cached all the data I need to
process the stream topic, so I unlock the stream processing and continue

All this works well, except for some very rare situation, when the
following happens (as I understand it, maybe here I am making some mistake):

  - for a long period of time there is no update to (at least single
partition) of the state topic

  - when I try to cache the state topic during startup as explained above,
it might never finish, because I will never get a message with offset N - 1
- that is because I will not get any message at all, because all of the
data has been wiped out

  - because I don't know if I get all the data from the state topic, I
cannot start processing the stream topic and the whole application is
stuck, until first message arrives into all partition of the state topic
(which might even never happen)

  - I might use some sort of timeout to handle this, but this could be
dangerous, relying on KafkaConsumer.poll() returning empty records sounds
to me a little fragile too (because this might also indicate that no
records could have been fetched within the timeout, am I right?), what
would totally solve my issue would be that during data retention, the last
message would always be kept, and therefore I will always get the message
with offset N - 1, and the whole issue would vanish.

The situation when a partition on the state topic gets no updates during
long time happens mostly in development environment (where there is little
to no traffic), but I sense that this could be an issue in production too,
for example due to some repartitioning of topics.

Does that make any sense to you now?

Thanks again for your response,

  Jan



On 02/09/2017 08:00 AM, Matthias J. Sax wrote:


Jan,

you scenario is quite complex and I am not sure if I understood every
part of it. I try to break it down:

In my scenario on startup, I want to read all data from a topic (or a

subset of its partitions),
wait until all the old data has been cached and then start processing of
a different stream


That is hard to accomplish in general. Kafka Streams internally uses
KafkaConsumer (one instance per StreamThread) and thus, does rely on the
consumer's 

Re: [VOTE] 0.10.2.0 RC1

2017-02-13 Thread Eno Thereska
+1 (non binding)

Checked streams. Verified that stream tests work and examples off 
confluentinc/examples/kafka-streams work.

Thanks
Eno

> On 10 Feb 2017, at 16:51, Ewen Cheslack-Postava  wrote:
> 
> Hello Kafka users, developers and client-developers,
> 
> This is RC1 for release of Apache Kafka 0.10.2.0.
> 
> This is a minor version release of Apache Kafka. It includes 19 new KIPs.
> See the release notes and release plan (https://cwiki.apache.org/
> confluence/display/KAFKA/Release+Plan+0.10.2.0) for more details. A few
> feature highlights: SASL-SCRAM support, improved client compatibility to
> allow use of clients newer than the broker, session windows and global
> tables in the Kafka Streams API, single message transforms in the Kafka
> Connect framework.
> 
> Important note: in addition to the artifacts generated using JDK7 for Scala
> 2.10 and 2.11, this release also includes experimental artifacts built
> using JDK8 for Scala 2.12.
> 
> Important code changes since RC0 (non-docs, non system tests):
> 
> * KAFKA-4728; KafkaConsumer#commitSync should copy its input
> * KAFKA-4441; Monitoring incorrect during topic creation and deletion
> * KAFKA-4734; Trim the time index on old segments
> * KAFKA-4725; Stop leaking messages in produce request body when requests
> are delayed
> * KAFKA-4716: Fix case when controller cannot be reached
> 
> Release notes for the 0.10.2.0 release:
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/RELEASE_NOTES.html
> 
> *** Please download, test and vote by Monday, Feb 13, 5pm PT ***
> 
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
> 
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/
> 
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
> 
> * Javadoc:
> http://home.apache.org/~ewencp/kafka-0.10.2.0-rc1/javadoc/
> 
> * Tag to be voted upon (off 0.10.2 branch) is the 0.10.2.0 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=e825b7994bf8c8c4871d1e0973e287e6d31c7ae4
> 
> 
> * Documentation:
> http://kafka.apache.org/0102/documentation.html
> 
> * Protocol:
> http://kafka.apache.org/0102/protocol.html
> 
> * Successful Jenkins builds for the 0.10.2 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-0.10.2-jdk7/74/
> System tests: https://jenkins.confluent.io/job/system-test-kafka-0.10.2/25/
> 
> /**
> 
> Thanks,
> Ewen