[jira] [Created] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords

2018-03-07 Thread JIRA
Maciej Bryński created KAFKA-6626:
-

 Summary: Performance bottleneck in Kafka Connect sendRecords
 Key: KAFKA-6626
 URL: https://issues.apache.org/jira/browse/KAFKA-6626
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Maciej Bryński
 Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png

Kafka Connect is using IdentityHashMap for storing records.

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]

Unfortunately this solution is very slow (2 times slower than normal HashMap / 
HashSet).

Benchmark result (code in attachment).
{code:java}
Identity 3977
Set 2442
Map 2207
Fast Set 2067
{code}
This problem is greatly slowing Kafka Connect.

!image-2018-03-08-08-35-19-247.png!

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6625) kafka offset reset when I upgrade kafka from 0.11.0 to 1.0.0 version

2018-03-07 Thread barry010 (JIRA)
barry010 created KAFKA-6625:
---

 Summary: kafka offset reset when I upgrade kafka from 0.11.0 to 
1.0.0 version
 Key: KAFKA-6625
 URL: https://issues.apache.org/jira/browse/KAFKA-6625
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0
Reporter: barry010
 Attachments: consumer_offset-36.txt, flume.log.txt, server.log.txt

when I upgrade kafka from 0.11.0 to 1.0.0 version,during I shut down the 
broker01 with kill signal, update the code,and start the new broker01,I found 
that one of my consumer group consumerate rose to 10M/s。

And I got the information from __consumer_offsets-63 partition,I found the most 
of the partition's offset was reset in timestamp 1520316174188(2018/3/6 
14:2:54)。this timestamp is between I shut down the broker01 and start the new 
broker01。

I have not set the auto.offset.reset,and this topic only one consumer 
group,with flume

I put the three log ,consumer log,coordinator  log,__consumer_offsets info。

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-186: Increase offsets retention default to 7 days

2018-03-07 Thread Ted Yu
+1
 Original message From: Guozhang Wang  
Date: 3/7/18  9:17 PM  (GMT-08:00) To: dev@kafka.apache.org Subject: Re: [VOTE] 
KIP-186: Increase offsets retention default to 7 days 
+1 (binding).

On Wed, Mar 7, 2018 at 5:04 PM, James Cheng  wrote:

> +1 (non-binding)
>
> -James
>
> > On Mar 7, 2018, at 1:20 PM, Jay Kreps  wrote:
> >
> > +1
> >
> > I think we can improve this in the future, but this simple change will
> > avoid a lot of pain. Thanks for reviving it Ewen.
> >
> > -Jay
> >
> > On Mon, Mar 5, 2018 at 11:35 AM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> >> I'd like to kick off voting for KIP-186:
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 186%3A+Increase+offsets+retention+default+to+7+days
> >>
> >> This is the trivial fix that people in the DISCUSS thread were in favor
> of.
> >> There are some ideas for further refinements, but I think we can follow
> up
> >> with those in subsequent KIPs, see the discussion thread for details.
> Also
> >> note this is related, but complementary, to
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets
> >> .
> >>
> >> And of course +1 (binding) from me.
> >>
> >> Thanks,
> >> Ewen
> >>
>
>


-- 
-- Guozhang


Re: [VOTE] KIP-186: Increase offsets retention default to 7 days

2018-03-07 Thread Guozhang Wang
+1 (binding).

On Wed, Mar 7, 2018 at 5:04 PM, James Cheng  wrote:

> +1 (non-binding)
>
> -James
>
> > On Mar 7, 2018, at 1:20 PM, Jay Kreps  wrote:
> >
> > +1
> >
> > I think we can improve this in the future, but this simple change will
> > avoid a lot of pain. Thanks for reviving it Ewen.
> >
> > -Jay
> >
> > On Mon, Mar 5, 2018 at 11:35 AM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> >> I'd like to kick off voting for KIP-186:
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 186%3A+Increase+offsets+retention+default+to+7+days
> >>
> >> This is the trivial fix that people in the DISCUSS thread were in favor
> of.
> >> There are some ideas for further refinements, but I think we can follow
> up
> >> with those in subsequent KIPs, see the discussion thread for details.
> Also
> >> note this is related, but complementary, to
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets
> >> .
> >>
> >> And of course +1 (binding) from me.
> >>
> >> Thanks,
> >> Ewen
> >>
>
>


-- 
-- Guozhang


Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

2018-03-07 Thread John Roesler
On Wed, Mar 7, 2018 at 8:03 PM, John Roesler  wrote:

> Thanks Ted,
>
> Sure thing; I updated the example code in the KIP with a little snippet.
>
> -John
>
> On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu  wrote:
>
>> Looks good.
>>
>> See if you can add punctuator into the sample code.
>>
>> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler  wrote:
>>
>> > Dear Kafka community,
>> >
>> > I am proposing KIP-267 to augment the public Streams test utils API.
>> > The goal is to simplify testing of Kafka Streams applications.
>> >
>> > Please find details in the
>> > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils
>> >
>> > An initial WIP PR can be found here:https://github.com/
>> > apache/kafka/pull/4662
>> >
>> > I also included the user-list (please hit "reply-all" to include both
>> > lists in this KIP discussion).
>> >
>> > Thanks,
>> >
>> > -John
>> >
>>
>
>


[DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

2018-03-07 Thread John Roesler
Dear Kafka community,

I am proposing KIP-267 to augment the public Streams test utils API.
The goal is to simplify testing of Kafka Streams applications.

Please find details in the
wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP-267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils

An initial WIP PR can be found here:https://github.com/apache/kafka/pull/4662

I also included the user-list (please hit "reply-all" to include both
lists in this KIP discussion).

Thanks,

-John


[jira] [Created] (KAFKA-6624) log segment deletion could cause a disk to be marked offline incorrectly

2018-03-07 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6624:
--

 Summary: log segment deletion could cause a disk to be marked 
offline incorrectly
 Key: KAFKA-6624
 URL: https://issues.apache.org/jira/browse/KAFKA-6624
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.0
Reporter: Jun Rao


Saw the following log.

[2018-03-06 23:12:20,721] ERROR Error while flushing log for topic1-0 in dir 
/data01/kafka-logs with offset 80993 (kafka.server.LogDirFailureChannel)

java.nio.channels.ClosedChannelException

        at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:110)

        at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:379)

        at 
org.apache.kafka.common.record.FileRecords.flush(FileRecords.java:163)

        at 
kafka.log.LogSegment$$anonfun$flush$1.apply$mcV$sp(LogSegment.scala:375)

        at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)

        at kafka.log.LogSegment$$anonfun$flush$1.apply(LogSegment.scala:374)

        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)

        at kafka.log.LogSegment.flush(LogSegment.scala:374)

        at 
kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1374)

        at 
kafka.log.Log$$anonfun$flush$1$$anonfun$apply$mcV$sp$4.apply(Log.scala:1373)

        at scala.collection.Iterator$class.foreach(Iterator.scala:891)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

        at kafka.log.Log$$anonfun$flush$1.apply$mcV$sp(Log.scala:1373)

        at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)

        at kafka.log.Log$$anonfun$flush$1.apply(Log.scala:1368)

        at kafka.log.Log.maybeHandleIOException(Log.scala:1669)

        at kafka.log.Log.flush(Log.scala:1368)

        at 
kafka.log.Log$$anonfun$roll$2$$anonfun$apply$1.apply$mcV$sp(Log.scala:1343)

        at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)

        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61)

        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

[2018-03-06 23:12:20,722] INFO [ReplicaManager broker=0] Stopping serving 
replicas in dir /data01/kafka-logs (kafka.server.ReplicaManager)

It seems that topic1 was being deleted around the time when flushing was 
called. Then flushing hit an IOException, which caused the disk to be marked 
offline incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6623) Consider renaming inefficient RecordBatch operations

2018-03-07 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6623:
--

 Summary: Consider renaming inefficient RecordBatch operations
 Key: KAFKA-6623
 URL: https://issues.apache.org/jira/browse/KAFKA-6623
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Certain batch-level operations are only efficient with the new message format 
version. For example, {{RecordBatch.baseOffset}} requires decompression for the 
old message format versions. It is a bit too easy at the moment to overlook the 
performance implications when using these methods at the moment, which results 
in issues like KAFKA-6622. We should consider either renaming them to make the 
complexity more apparent or modify the API so that the old message format 
simply expose the option conveniently. A similar case is the record count, 
which is efficient in v2, but inefficient for older formats. We handled this 
case by exposing a {{countOrNull}} method which only returns the count for v2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-186: Increase offsets retention default to 7 days

2018-03-07 Thread James Cheng
+1 (non-binding)

-James

> On Mar 7, 2018, at 1:20 PM, Jay Kreps  wrote:
> 
> +1
> 
> I think we can improve this in the future, but this simple change will
> avoid a lot of pain. Thanks for reviving it Ewen.
> 
> -Jay
> 
> On Mon, Mar 5, 2018 at 11:35 AM, Ewen Cheslack-Postava 
> wrote:
> 
>> I'd like to kick off voting for KIP-186:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 186%3A+Increase+offsets+retention+default+to+7+days
>> 
>> This is the trivial fix that people in the DISCUSS thread were in favor of.
>> There are some ideas for further refinements, but I think we can follow up
>> with those in subsequent KIPs, see the discussion thread for details. Also
>> note this is related, but complementary, to
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets
>> .
>> 
>> And of course +1 (binding) from me.
>> 
>> Thanks,
>> Ewen
>> 



[jira] [Created] (KAFKA-6622) GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly

2018-03-07 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6622:
---

 Summary: GroupMetadataManager.loadGroupsAndOffsets decompresses 
record batch needlessly
 Key: KAFKA-6622
 URL: https://issues.apache.org/jira/browse/KAFKA-6622
 Project: Kafka
  Issue Type: Bug
Reporter: radai rosenblatt
Assignee: radai rosenblatt
 Attachments: kafka batch iteration funtime.png

when reading records from a consumer offsets batch, the entire batch is 
decompressed multiple times (per record) as part of calling `batch.baseOffset`. 
this is a very expensive operation being called in a loop for no reason:
!kafka batch iteration funtime.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-07 Thread Matthias J. Sax
Jun,

thanks for your comment. This should actually work for Streams, because
we don't rely on producer "hashing" but specify the partition number
explicitly on send().

About not allowing to change the number of partition for changelog
topics: for Streams, this seems to imply that we need to create a second
changelog topic for each store with the new partition count. However, it
would be unclear when/if we can delete the old topic. Thus, it moves the
"problem" into the application layer. It's hard to judge for me atm what
the impact would be, but it's something we should pay attention to.


-Matthias

On 3/6/18 3:45 PM, Jun Rao wrote:
> Hi, Mattias,
> 
> Regarding your comment "If it would be time-delay based, it might be
> problematic
> for Kafka Streams: if we get the information that the new input partitions
> are available for producing, we need to enable the new changelog partitions
> for producing, too. If those would not be available yet, because the
> time-delay did not trigger yet, it would be problematic to avoid
> crashing.", could you just enable the changelog topic to write to its new
> partitions immediately?  The input topic can be configured with a delay in
> writing to the new partitions. Initially, there won't be new data produced
> into the newly added partitions in the input topic. However, we could
> prebuild the state for the new input partition and write the state changes
> to the corresponding new partitions in the changelog topic.
> 
> Hi, Jan,
> 
> For a compacted topic, garbage collecting the old keys in the existing
> partitions after partition expansion can be tricky as your pointed out. A
> few options here. (a) Let brokers exchange keys across brokers during
> compaction. This will add complexity on the broker side. (b) Build an
> external tool that scans the compacted topic and drop the prefix of a
> partition if all records in the prefix are removable. The admin can then
> run this tool when the unneeded space needs to be reclaimed. (c) Don't
> support partition change in a compacted topic. This might be ok since most
> compacted topics are not high volume.
> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin  wrote:
> 
>> Hi everyone,
>>
>> Thanks for all the comments! It appears that everyone prefers linear
>> hashing because it reduces the amount of state that needs to be moved
>> between consumers (for stream processing). The KIP has been updated to use
>> linear hashing.
>>
>> Regarding the migration endeavor: it seems that migrating producer library
>> to use linear hashing should be pretty straightforward without
>> much operational endeavor. If we don't upgrade client library to use this
>> KIP, we can not support in-order delivery after partition is changed
>> anyway. Suppose we upgrade client library to use this KIP, if partition
>> number is not changed, the key -> partition mapping will be exactly the
>> same as it is now because it is still determined using murmur_hash(key) %
>> original_partition_num. In other words, this change is backward compatible.
>>
>> Regarding the load distribution: if we use linear hashing, the load may be
>> unevenly distributed because those partitions which are not split may
>> receive twice as much traffic as other partitions that are split. This
>> issue can be mitigated by creating topic with partitions that are several
>> times the number of consumers. And there will be no imbalance if the
>> partition number is always doubled. So this imbalance seems acceptable.
>>
>> Regarding storing the partition strategy as per-topic config: It seems not
>> necessary since we can still use murmur_hash as the default hash function
>> and additionally apply the linear hashing algorithm if the partition number
>> has increased. Not sure if there is any use-case for producer to use a
>> different hash function. Jason, can you check if there is some use-case
>> that I missed for using the per-topic partition strategy?
>>
>> Regarding how to reduce latency (due to state store/load) in stream
>> processing consumer when partition number changes: I need to read the Kafka
>> Stream code to understand how Kafka Stream currently migrate state between
>> consumers when the application is added/removed for a given job. I will
>> reply after I finish reading the documentation and code.
>>
>>
>> Thanks,
>> Dong
>>
>>
>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson 
>> wrote:
>>
>>> Great discussion. I think I'm wondering whether we can continue to leave
>>> Kafka agnostic to the partitioning strategy. The challenge is
>> communicating
>>> the partitioning logic from producers to consumers so that the
>> dependencies
>>> between each epoch can be determined. For the sake of discussion, imagine
>>> you did something like the following:
>>>
>>> 1. The name (and perhaps version) of a partitioning strategy is stored in
>>> topic configuration when a topic is created.
>>> 2. The producer looks 

[DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-03-07 Thread Matthias J. Sax
Hi,

I want to propose KIP-258 for the Streams API to allow storing
timestamps in RocksDB. This feature is the basis to resolve multiple
tickets (issues and feature requests).

Looking forward to your comments about this!

https://cwiki.apache.org/confluence/display/KAFKA/KIP-258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB


-Matthias




signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-07 Thread Jay Kreps
Hey Jason,

I agree. Even apart from this proposal the partitioning strategy is really
an essential part of the metadata for a topic and had we been less lazy we
probably would have included it with the topic metadata.

I think in terms of grandfathering this in you could have existing topics
just be auto-assigned a "client" partitioning and add a "linear" strategy
(or whatever) that is that is checked server-side and supported in terms of
re-partitioning.

-Jay

On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson  wrote:

> Great discussion. I think I'm wondering whether we can continue to leave
> Kafka agnostic to the partitioning strategy. The challenge is communicating
> the partitioning logic from producers to consumers so that the dependencies
> between each epoch can be determined. For the sake of discussion, imagine
> you did something like the following:
>
> 1. The name (and perhaps version) of a partitioning strategy is stored in
> topic configuration when a topic is created.
> 2. The producer looks up the partitioning strategy before writing to a
> topic and includes it in the produce request (for fencing). If it doesn't
> have an implementation for the configured strategy, it fails.
> 3. The consumer also looks up the partitioning strategy and uses it to
> determine dependencies when reading a new epoch. It could either fail or
> make the most conservative dependency assumptions if it doesn't know how to
> implement the partitioning strategy. For the consumer, the new interface
> might look something like this:
>
> // Return the partition dependencies following an epoch bump
> Map dependencies(int numPartitionsBeforeEpochBump,
> int numPartitionsAfterEpochBump)
>
> The unordered case then is just a particular implementation which never has
> any epoch dependencies. To implement this, we would need some way for the
> consumer to find out how many partitions there were in each epoch, but
> maybe that's not too unreasonable.
>
> Thanks,
> Jason
>
>
> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak 
> wrote:
>
> > Hi Dong
> >
> > thank you very much for your questions.
> >
> > regarding the time spend copying data across:
> > It is correct that copying data from a topic with one partition mapping
> to
> > a topic with a different partition mapping takes way longer than we can
> > stop producers. Tens of minutes is a very optimistic estimate here. Many
> > people can not afford copy full steam and therefore will have some rate
> > limiting in place, this can bump the timespan into the day's. The good
> part
> > is that the vast majority of the data can be copied while the producers
> are
> > still going. One can then, piggyback the consumers ontop of this
> timeframe,
> > by the method mentioned (provide them an mapping from their old offsets
> to
> > new offsets in their repartitioned topics. In that way we separate
> > migration of consumers from migration of producers (decoupling these is
> > what kafka is strongest at). The time to actually swap over the producers
> > should be kept minimal by ensuring that when a swap attempt is started
> the
> > consumer copying over should be very close to the log end and is expected
> > to finish within the next fetch. The operation should have a time-out and
> > should be "reattemtable".
> >
> > Importance of logcompaction:
> > If a producer produces key A, to partiton 0, its forever gonna be there,
> > unless it gets deleted. The record might sit in there for years. A new
> > producer started with the new partitions will fail to delete the record
> in
> > the correct partition. Th record will be there forever and one can not
> > reliable bootstrap new consumers. I cannot see how linear hashing can
> solve
> > this.
> >
> > Regarding your skipping of userland copying:
> > 100%, copying the data across in userland is, as far as i can see, only a
> > usecase for log compacted topics. Even for logcompaction + retentions it
> > should only be opt-in. Why did I bring it up? I think log compaction is a
> > very important feature to really embrace kafka as a "data plattform". The
> > point I also want to make is that copying data this way is completely
> > inline with the kafka architecture. it only consists of reading and
> writing
> > to topics.
> >
> > I hope it clarifies more why I think we should aim for more than the
> > current KIP. I fear that once the KIP is done not much more effort will
> be
> > taken.
> >
> >
> >
> >
> > On 04.03.2018 02:28, Dong Lin wrote:
> >
> >> Hey Jan,
> >>
> >> In the current proposal, the consumer will be blocked on waiting for
> other
> >> consumers of the group to consume up to a given offset. In most cases,
> all
> >> consumers should be close to the LEO of the partitions when the
> partition
> >> expansion happens. Thus the time waiting should not be long e.g. on the
> >> order of seconds. On the other hand, it may take a long time to wait for
> >> the entire partition 

Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-07 Thread Guozhang Wang
If we want to maintain ordering than for a pluggable producer-side
partitioner, it needs to coherent with the linear hashing scheme, i.e. we
effectively restrict what kind of customization users can do. However such
restrictions are hard to enforce programmatically unless we change the API,
so maybe we can only document / educate users to do that when they do
customize the partitioner after the upgrade.


Guozhang

On Wed, Mar 7, 2018 at 10:43 AM, Jason Gustafson  wrote:

> Hi Dong,
>
> What is not clear to me is how the use of linear hashing affects the
> partitioning logic in the producer, which the user is currently allowed to
> customize through the Partitioner interface. It sounds like we are
> effectively deprecating that interface since we will only provide ordering
> guarantees across partition changes if linear hashing is used. Maybe you
> can clarify whether that is the intention? Basically I am wondering how
> much additional work it would be to leave that partitioning logic pluggable
> and whether it is worthwhile to do so. One potential downside I can think
> of is that it may complicate compaction, but we don't have a concrete
> proposal for handling that at the moment anyway, so it's hard to say.
>
> Thanks,
> Jason
>
>
> On Tue, Mar 6, 2018 at 3:45 PM, Jun Rao  wrote:
>
> > Hi, Mattias,
> >
> > Regarding your comment "If it would be time-delay based, it might be
> > problematic
> > for Kafka Streams: if we get the information that the new input
> partitions
> > are available for producing, we need to enable the new changelog
> partitions
> > for producing, too. If those would not be available yet, because the
> > time-delay did not trigger yet, it would be problematic to avoid
> > crashing.", could you just enable the changelog topic to write to its new
> > partitions immediately?  The input topic can be configured with a delay
> in
> > writing to the new partitions. Initially, there won't be new data
> produced
> > into the newly added partitions in the input topic. However, we could
> > prebuild the state for the new input partition and write the state
> changes
> > to the corresponding new partitions in the changelog topic.
> >
> > Hi, Jan,
> >
> > For a compacted topic, garbage collecting the old keys in the existing
> > partitions after partition expansion can be tricky as your pointed out. A
> > few options here. (a) Let brokers exchange keys across brokers during
> > compaction. This will add complexity on the broker side. (b) Build an
> > external tool that scans the compacted topic and drop the prefix of a
> > partition if all records in the prefix are removable. The admin can then
> > run this tool when the unneeded space needs to be reclaimed. (c) Don't
> > support partition change in a compacted topic. This might be ok since
> most
> > compacted topics are not high volume.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin  wrote:
> >
> > > Hi everyone,
> > >
> > > Thanks for all the comments! It appears that everyone prefers linear
> > > hashing because it reduces the amount of state that needs to be moved
> > > between consumers (for stream processing). The KIP has been updated to
> > use
> > > linear hashing.
> > >
> > > Regarding the migration endeavor: it seems that migrating producer
> > library
> > > to use linear hashing should be pretty straightforward without
> > > much operational endeavor. If we don't upgrade client library to use
> this
> > > KIP, we can not support in-order delivery after partition is changed
> > > anyway. Suppose we upgrade client library to use this KIP, if partition
> > > number is not changed, the key -> partition mapping will be exactly the
> > > same as it is now because it is still determined using
> murmur_hash(key) %
> > > original_partition_num. In other words, this change is backward
> > compatible.
> > >
> > > Regarding the load distribution: if we use linear hashing, the load may
> > be
> > > unevenly distributed because those partitions which are not split may
> > > receive twice as much traffic as other partitions that are split. This
> > > issue can be mitigated by creating topic with partitions that are
> several
> > > times the number of consumers. And there will be no imbalance if the
> > > partition number is always doubled. So this imbalance seems acceptable.
> > >
> > > Regarding storing the partition strategy as per-topic config: It seems
> > not
> > > necessary since we can still use murmur_hash as the default hash
> function
> > > and additionally apply the linear hashing algorithm if the partition
> > number
> > > has increased. Not sure if there is any use-case for producer to use a
> > > different hash function. Jason, can you check if there is some use-case
> > > that I missed for using the per-topic partition strategy?
> > >
> > > Regarding how to reduce latency (due to state store/load) in stream
> > > processing consumer when 

Re: [VOTE] KIP-186: Increase offsets retention default to 7 days

2018-03-07 Thread Jay Kreps
+1

I think we can improve this in the future, but this simple change will
avoid a lot of pain. Thanks for reviving it Ewen.

-Jay

On Mon, Mar 5, 2018 at 11:35 AM, Ewen Cheslack-Postava 
wrote:

> I'd like to kick off voting for KIP-186:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 186%3A+Increase+offsets+retention+default+to+7+days
>
> This is the trivial fix that people in the DISCUSS thread were in favor of.
> There are some ideas for further refinements, but I think we can follow up
> with those in subsequent KIPs, see the discussion thread for details. Also
> note this is related, but complementary, to
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets
> .
>
> And of course +1 (binding) from me.
>
> Thanks,
> Ewen
>


Re: [ANNOUNCE] Apache Kafka 1.0.1 Released

2018-03-07 Thread Ismael Juma
Thanks for running the release Ewen and great work everyone!

Ismael

On Tue, Mar 6, 2018 at 1:14 PM, Ewen Cheslack-Postava  wrote:

> The Apache Kafka community is pleased to announce the release for Apache
> Kafka
> 1.0.1.
>
> This is a bugfix release for the 1.0 branch that was first released with
> 1.0.0 about 4 months ago. We've fixed 49 issues since that release. Most of
> these are non-critical, but in aggregate these fixes will have significant
> impact. A few of the more significant fixes include:
>
> * KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
> plugins
> * KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
> down conversion
> * KAFKA-6269: KTable state restore fails after rebalance
> * KAFKA-6190: GlobalKTable never finishes restoring when consuming
> transactional messages
> * KAFKA-6529: Stop file descriptor leak when client disconnects with
> staged receives
> * KAFKA-6238: Issues with protocol version when applying a rolling upgrade
> to 1.0.0
>
>
> All of the changes in this release can be found in the release notes:
>
>
> https://dist.apache.org/repos/dist/release/kafka/1.0.1/RELEASE_NOTES.html
>
>
>
> You can download the source release from:
>
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
> kafka-1.0.1-src.tgz
>
>
> and binary releases from:
>
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
> kafka_2.11-1.0.1.tgz
> (Scala 2.11)
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
> kafka_2.12-1.0.1.tgz
> (Scala 2.12)
>
> 
> ---
>
>
> Apache Kafka is a distributed streaming platform with four core APIs:
>
>
> ** The Producer API allows an application to publish a stream records to
> one or more Kafka topics.
>
>
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
>
>
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an output
> stream to one or more output topics, effectively transforming the input
> streams to output streams.
>
>
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might capture
> every change to a table.three key capabilities:
>
>
>
> With these APIs, Kafka can be used for two broad classes of application:
>
>
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
>
>
> ** Building real-time streaming applications that transform or react to the
> streams of data.
>
>
>
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>
>
>
> A big thank you for the following 36 contributors to this release!
>
> Alex Good, Andras Beni, Andy Bryant, Arjun Satish, Bill Bejeck, Colin P.
> Mccabe, Colin Patrick McCabe, ConcurrencyPractitioner, Damian Guy, Daniel
> Wojda, Dong Lin, Edoardo Comar, Ewen Cheslack-Postava, Filipe Agapito,
> fredfp, Guozhang Wang, huxihx, Ismael Juma, Jason Gustafson, Jeremy
> Custenborder, Jiangjie (Becket) Qin, Joel Hamill, Konstantine Karantasis,
> lisa2lisa, Logan Buckley, Manjula K, Matthias J. Sax, Nick Chiu, parafiend,
> Rajini Sivaram, Randall Hauch, Robert Yokota, Ron Dagostino, tedyu,
> Yaswanth Kumar, Yu.
>
>
> We welcome your help and feedback. For more information on how to
> report problems,
> and to get involved, visit the project website at http://kafka.apache.org/
>
>
> Thank you!
> Ewen
>


[jira] [Created] (KAFKA-6621) Update Streams docs with details for UncaughtExceptionHandler

2018-03-07 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6621:
--

 Summary: Update Streams docs with details for 
UncaughtExceptionHandler
 Key: KAFKA-6621
 URL: https://issues.apache.org/jira/browse/KAFKA-6621
 Project: Kafka
  Issue Type: Improvement
  Components: documentation, streams
Reporter: Matthias J. Sax


We should update the docs to explain that calling {{KafkaStreams#close()}} 
within the UncaughtExceptionHandler-callback might result in a deadlock and one 
should always specify a timeout parameter for this case.

As an alternative to avoid the deadlock, setting a flag in the handler and 
calling {{#close}} outside of the callback should also work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Question regarding network outage

2018-03-07 Thread Guru Balse
We have a 0.9 kafka cluster consists of 7 kafka-brokers colocated with 7
zookeepers

Producers/consumers are going full tilt at this cluster and we measure
delays in response time while we run disruptive tests

Delays in response time are 'acceptable' (a few seconds max) for the
following disruptions on 1 or 2 of these hosts

   - Soft kill kafka-broker/zookepeer
   - Hard kill kafka-broker/zookepeer
   - soft/hard reboot host


However when we disrupt the network for 1 or 2 hosts using a sequence such
as below, we do see delays of ~ 30 seconds (1 host = topic leader) to ~ 60
seconds (2 hosts including topic leader)

   - Shut down a network interface, '/etc/init.d/network stop'.
   - Sleep for a few minutes
   - Start network interface, '/etc/init.d/network start'


Those delays suggest that perhaps there is a 30 second timeout somewhere
that explains the 30 second delay, and it is conjectured that shortening
this timeout may possibly reduce the resultant delay.

We have tried changing various timeouts such as those listed below, but
have had no success so far

   - replica.socket.timeout.ms
   - request.timeout.ms
   - zookeeper.session.timeout
   - zookeeper.session.timeout
   - connections.max.idle.ms
   - controller.socket.timeout.ms
   - group.max.session.timeout.ms
   - group.min.session.timeout.ms

Searches have not yielded any solutions.  Any help/guidance is greatly
appreciated.

Thanks and regards,


*--Guru Balse*
*Principal Software Engineer |Service Cloud | Salesforce.com | Landmark
10th @ HQ  | 510-859-6975*





RE: [VOTE] KIP-186: Increase offsets retention default to 7 days

2018-03-07 Thread Skrzypek, Jonathan
+1 (non-binding)

Jonathan Skrzypek 


-Original Message-
From: Bill Bejeck [mailto:bbej...@gmail.com] 
Sent: 06 March 2018 22:03
To: dev@kafka.apache.org
Subject: Re: [VOTE] KIP-186: Increase offsets retention default to 7 days

+1

Thanks,
Bill

On Tue, Mar 6, 2018 at 2:27 PM, Matthias J. Sax 
wrote:

> +1 (binding)
>
> On 3/6/18 10:43 AM, Vahid S Hashemian wrote:
> > +1 (non-binding)
> >
> > Thanks Ewen.
> >
> > --Vahid
> >
> >
> >
> > From:   Ewen Cheslack-Postava 
> > To: dev@kafka.apache.org
> > Date:   03/05/2018 11:35 AM
> > Subject:[VOTE] KIP-186: Increase offsets retention default to 7
> > days
> >
> >
> >
> > I'd like to kick off voting for KIP-186:
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> apache.org_confluence_display_KAFKA_KIP-2D186-253A-2BIncrease-2Boffset
> s- 2Bretention-2Bdefault-2Bto-2B7-2Bdays=DwIBaQ=jf_
> iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=
> 94NjYuB95gNgt4OKhGIgk8nN3CXB2PkCbXfEAgh83zs=YIsNYOzi-
> C5mYB9mQwkgv4g86S6cMDzHPUUkvmTJ4A8=
> >
> >
> > This is the trivial fix that people in the DISCUSS thread were in 
> > favor of.
> > There are some ideas for further refinements, but I think we can 
> > follow
> up
> > with those in subsequent KIPs, see the discussion thread for details.
> Also
> > note this is related, but complementary, to 
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> apache.org_confluence_display_KAFKA_KIP-2D211-253A-2BRevise-
> 2BExpiration-2BSemantics-2Bof-2BConsumer-2BGroup-2BOffsets&
> d=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=94NjYuB95gNgt4OKhGIgk8nN3CXB2PkCbXfEAgh83zs=
> Te8RdmUdMzc8i69s4aS5UxkcW7KBmSdYk4PYgML2jfc=
> >
> > .
> >
> > And of course +1 (binding) from me.
> >
> > Thanks,
> > Ewen
> >
> >
> >
> >
> >
>
>


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-07 Thread Jason Gustafson
Hi Dong,

What is not clear to me is how the use of linear hashing affects the
partitioning logic in the producer, which the user is currently allowed to
customize through the Partitioner interface. It sounds like we are
effectively deprecating that interface since we will only provide ordering
guarantees across partition changes if linear hashing is used. Maybe you
can clarify whether that is the intention? Basically I am wondering how
much additional work it would be to leave that partitioning logic pluggable
and whether it is worthwhile to do so. One potential downside I can think
of is that it may complicate compaction, but we don't have a concrete
proposal for handling that at the moment anyway, so it's hard to say.

Thanks,
Jason


On Tue, Mar 6, 2018 at 3:45 PM, Jun Rao  wrote:

> Hi, Mattias,
>
> Regarding your comment "If it would be time-delay based, it might be
> problematic
> for Kafka Streams: if we get the information that the new input partitions
> are available for producing, we need to enable the new changelog partitions
> for producing, too. If those would not be available yet, because the
> time-delay did not trigger yet, it would be problematic to avoid
> crashing.", could you just enable the changelog topic to write to its new
> partitions immediately?  The input topic can be configured with a delay in
> writing to the new partitions. Initially, there won't be new data produced
> into the newly added partitions in the input topic. However, we could
> prebuild the state for the new input partition and write the state changes
> to the corresponding new partitions in the changelog topic.
>
> Hi, Jan,
>
> For a compacted topic, garbage collecting the old keys in the existing
> partitions after partition expansion can be tricky as your pointed out. A
> few options here. (a) Let brokers exchange keys across brokers during
> compaction. This will add complexity on the broker side. (b) Build an
> external tool that scans the compacted topic and drop the prefix of a
> partition if all records in the prefix are removable. The admin can then
> run this tool when the unneeded space needs to be reclaimed. (c) Don't
> support partition change in a compacted topic. This might be ok since most
> compacted topics are not high volume.
>
> Thanks,
>
> Jun
>
>
> On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin  wrote:
>
> > Hi everyone,
> >
> > Thanks for all the comments! It appears that everyone prefers linear
> > hashing because it reduces the amount of state that needs to be moved
> > between consumers (for stream processing). The KIP has been updated to
> use
> > linear hashing.
> >
> > Regarding the migration endeavor: it seems that migrating producer
> library
> > to use linear hashing should be pretty straightforward without
> > much operational endeavor. If we don't upgrade client library to use this
> > KIP, we can not support in-order delivery after partition is changed
> > anyway. Suppose we upgrade client library to use this KIP, if partition
> > number is not changed, the key -> partition mapping will be exactly the
> > same as it is now because it is still determined using murmur_hash(key) %
> > original_partition_num. In other words, this change is backward
> compatible.
> >
> > Regarding the load distribution: if we use linear hashing, the load may
> be
> > unevenly distributed because those partitions which are not split may
> > receive twice as much traffic as other partitions that are split. This
> > issue can be mitigated by creating topic with partitions that are several
> > times the number of consumers. And there will be no imbalance if the
> > partition number is always doubled. So this imbalance seems acceptable.
> >
> > Regarding storing the partition strategy as per-topic config: It seems
> not
> > necessary since we can still use murmur_hash as the default hash function
> > and additionally apply the linear hashing algorithm if the partition
> number
> > has increased. Not sure if there is any use-case for producer to use a
> > different hash function. Jason, can you check if there is some use-case
> > that I missed for using the per-topic partition strategy?
> >
> > Regarding how to reduce latency (due to state store/load) in stream
> > processing consumer when partition number changes: I need to read the
> Kafka
> > Stream code to understand how Kafka Stream currently migrate state
> between
> > consumers when the application is added/removed for a given job. I will
> > reply after I finish reading the documentation and code.
> >
> >
> > Thanks,
> > Dong
> >
> >
> > On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson 
> > wrote:
> >
> > > Great discussion. I think I'm wondering whether we can continue to
> leave
> > > Kafka agnostic to the partitioning strategy. The challenge is
> > communicating
> > > the partitioning logic from producers to consumers so that the
> > dependencies
> > > between each epoch can be determined. 

Re: [DISCUSS] KIP-257 - Configurable Quota Management

2018-03-07 Thread Rajini Sivaram
Hi Viktor,

Thanks for reviewing the KIP.

1. Yes, that is correct. Typically quotas would depend only on the current
partition state. But if you did want to track deleted partitions, you can
calculate the diff.
2. I can't think of an use case where ISRs or other replica information
would be useful to configure quotas. Since partition leaders process
fetch/produce requests, this is clearly useful in terms of setting quotas.
But I have defined PartitionMetadata trait rather than just using the
leader as an int so that we can add additional methods in future if
required. This keeps the interface extensible. Did you have any use case in
mind where additional metadata would be useful?

Regards,

Rajini

On Tue, Mar 6, 2018 at 8:56 AM, Viktor Somogyi 
wrote:

> Hi Rajini,
>
> I've read through your KIP and it looks good, I only have two things to
> clarify.
> 1. How do we detect removed partitions in updatePartitionMetadata? I'm
> presuming that the list here is the currently existing map of partitions,
> so if something is removed it can be calculated as the diff of the current
> and the previous update. Is that right?
> 2. PartitionMetadata contains only the leader at this moment, however there
> are similar classes that contain more information, like the replicas, isr,
> offline replicas. I think including them might make sense to provide a more
> robust API. What do you think?
>
> Thanks,
> Viktor
>
> On Wed, Feb 21, 2018 at 7:57 PM, Rajini Sivaram 
> wrote:
>
> > Hi all,
> >
> > I have submitted KIP-257 to enable customisation of client quota
> > computation:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 257+-+Configurable+Quota+Management
> >
> >
> > The KIP proposes to make quota management pluggable to enable group-based
> > and partition-based quotas for clients.
> >
> > Feedback and suggestions are welcome.
> >
> > Thank you...
> >
> > Regards,
> >
> > Rajini
> >
>


[jira] [Created] (KAFKA-6620) Documentation about "exactly_once" doesn't mention "transaction.state.log.min.isr"

2018-03-07 Thread Daniel Qian (JIRA)
Daniel Qian created KAFKA-6620:
--

 Summary: Documentation about "exactly_once" doesn't mention 
"transaction.state.log.min.isr" 
 Key: KAFKA-6620
 URL: https://issues.apache.org/jira/browse/KAFKA-6620
 Project: Kafka
  Issue Type: Bug
  Components: documentation, streams
Reporter: Daniel Qian


Documentation about "processing.guarantee" says:
{quote}The processing guarantee that should be used. Possible values are 
{{at_least_once}}(default) and {{exactly_once}}. Note that exactly-once 
processing requires a cluster of at least three brokers by default what is the 
recommended setting for production; *for development you can change this, by 
adjusting broker setting* 
`{color:#FF}*transaction.state.log.replication.factor*{color}`
{quote}
If one only set *transaction.state.log.replication.factor=1* but leave 
*transaction.state.log.min.isr* with default value (which is 2) the Streams 
Application will break.

Hope you guys modify the doc, thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)