Re: [DISCUSS] KIP-297: Externalizing Secrets for Connect Configurations

2018-06-11 Thread Konstantine Karantasis
Sounds great, happy to hear we all agree!
Thanks everyone!


- Konstantine


On Mon, Jun 11, 2018 at 4:22 PM, Colin McCabe  wrote:

> Sounds good.  Thanks, Konstantin.
>
> Colin
>
>
> On Mon, Jun 11, 2018, at 13:41, Rajini Sivaram wrote:
> > Hi Konstantine,
> >
> > Sounds reasonable to me too.
> >
> > Regards,
> >
> > Rajini
> >
> > On Mon, Jun 11, 2018 at 7:55 PM, Robert Yokota 
> wrote:
> >
> > > Hi Konstantine,
> > >
> > > Sounds reasonable!
> > >
> > > Thanks,
> > > Robert
> > >
> > > On Mon, Jun 11, 2018 at 11:49 AM, Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > > Hi everyone, after fixing an issue with a regular expression in
> Connect's
> > > > class loading isolation of the new component type ConfigProvider
> here:
> > > >
> > > > https://github.com/apache/kafka/pull/5177
> > > >
> > > > I noticed that the new interface ConfigProvider, along with its first
> > > > implementation FileConfigProvider, have been placed in the package:
> > > >
> > > > org.apache.kafka.common.config
> > > >
> > > > This specific package is mentioned in KIP-297 is a few places, but
> not in
> > > > any code snippets. I'd like to suggest moving the interface and any
> > > current
> > > > of future implementation classes in a new package named:
> > > >
> > > > org.apache.kafka.common.config.provider
> > > >
> > > > and update the KIP document accordingly.
> > > >
> > > > This seems to make sense in general. But, specifically, in Connect
> it is
> > > > desired since we treat ConfigProvider implementations as Connect
> > > components
> > > > that are loaded in isolation. Having a package for config providers
> will
> > > > allow us to avoid making any assumptions with respect to the name of
> a
> > > > class that implements `ConfigProvider` and is included in Apache
> Kafka.
> > > It
> > > > will suffice for this class to reside in the package
> > > > org.apache.kafka.common.config.provider.
> > > >
> > > > Let me know if this is a reasonable request and if you agree on
> amending
> > > > the KIP description.
> > > >
> > > > - Konstantine
> > > >
> > > >
> > > >
> > > > On Wed, May 16, 2018 at 10:33 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks for the update, Robert. Looks good to me.
> > > > >
> > > > > Regards,
> > > > >
> > > > > Rajini
> > > > >
> > > > > On Wed, May 16, 2018 at 4:43 PM, Robert Yokota  >
> > > > wrote:
> > > > >
> > > > > > Hi Rajini,
> > > > > >
> > > > > > Thanks for the excellent feedback!
> > > > > >
> > > > > > I've made the API changes that you've requested in the KIP.
> > > > > >
> > > > > >
> > > > > > > 1. Are we expecting one provider instance with different
> contexts
> > > > > > > provided to `ConfigProvider.get()`? If we created a different
> > > > provider
> > > > > > > instance for each context, we could deal with scheduling
> reloads in
> > > > the
> > > > > > > provider implementation?
> > > > > >
> > > > > > Yes, there would be one provider instance.  I've collapsed the
> > > > > > ConfigContext and the ConfigChangeCallback by adding a parameter
> > > > delayMs
> > > > > to
> > > > > > indicate when the change will happen.  When a particular
> > > ConfigProvider
> > > > > > retrieves a lease duration along with a key, it can either 1)
> > > > schedule a
> > > > > > background thread to push out the change when it happens (at
> which
> > > time
> > > > > the
> > > > > > delayMs will be 0), or invoke the callback immediately with the
> lease
> > > > > > duration set as delayMs (of course, in this case the values for
> the
> > > > keys
> > > > > > will be the old values).  A ConfProvider could be parameterized
> to do
> > > > one
> > > > > > or the other.
> > > > > >
> > > > > >
> > > > > > > 2. Couldn't ConfigData  be an interface that just returns a
> map of
> > > > > > > key-value pairs. Providers that return metadata could extend
> it to
> > > > > > provide
> > > > > > > metadata in a meaningful format instead of Map.
> > > > > >
> > > > > > I've replaced ConfigData with Map as you
> suggested.
> > > > > >
> > > > > >
> > > > > > > 3. For ZK, we would use ConfigProvider.get() without `keys` to
> get
> > > > all
> > > > > > > keys in the path. Do we have two get() methods since some
> providers
> > > > > need
> > > > > > > keys to be specified and some don't? How do we decide which
> one to
> > > > use?
> > > > > >
> > > > > > The ConfigProvider should be thought of like a Map interface and
> does
> > > > not
> > > > > > require that one signature of get() be preferred over the other.
> > > > KIP-226
> > > > > > can use get(String path) while Connect will use get(String path,
> > > > > > Set) since it knows which keys it is interested in.
> > > > > >
> > > > > >
> > > > > > A few more updates to the KIP:
> > > > > >
> > > > > > - I've elided the ConfigTransformer implementation as Colin
> > > suggested.
> > > > > > - The variable reference now looks like ${provider:[path:]key}
> where
> > > > the
> > > > > > 

Build failed in Jenkins: kafka-0.11.0-jdk7 #372

2018-06-11 Thread Apache Jenkins Server
See 


Changes:

[github] MINOR: update jackson dependencies (#5180)

--
[...truncated 761.55 KB...]
at java.io.RandomAccessFile.write(RandomAccessFile.java:550)
at 
org.gradle.internal.io.RandomAccessFileOutputStream.write(RandomAccessFileOutputStream.java:46)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at org.gradle.cache.internal.btree.ByteOutput.done(ByteOutput.java:62)
at 
org.gradle.cache.internal.btree.FileBackedBlockStore$BlockImpl.write(FileBackedBlockStore.java:200)
at 
org.gradle.cache.internal.btree.FileBackedBlockStore.write(FileBackedBlockStore.java:114)
... 22 more
Failed to execute 
org.gradle.cache.internal.AsyncCacheAccessDecoratedCache$2@47a1ca46.
org.gradle.api.UncheckedIOException: Could not add entry '-15196229975621799' 
to cache fileSnapshots.bin 
(
at 
org.gradle.cache.internal.btree.BTreePersistentIndexedCache.put(BTreePersistentIndexedCache.java:157)
at 
org.gradle.cache.internal.DefaultMultiProcessSafePersistentIndexedCache$2.run(DefaultMultiProcessSafePersistentIndexedCache.java:63)
at 
org.gradle.cache.internal.DefaultFileLockManager$DefaultFileLock.doWriteAction(DefaultFileLockManager.java:184)
at 
org.gradle.cache.internal.DefaultFileLockManager$DefaultFileLock.writeFile(DefaultFileLockManager.java:174)
at 
org.gradle.cache.internal.DefaultCacheAccess$UnitOfWorkFileAccess.writeFile(DefaultCacheAccess.java:462)
at 
org.gradle.cache.internal.DefaultMultiProcessSafePersistentIndexedCache.put(DefaultMultiProcessSafePersistentIndexedCache.java:61)
at 
org.gradle.cache.internal.AsyncCacheAccessDecoratedCache$2.run(AsyncCacheAccessDecoratedCache.java:54)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at 
org.gradle.cache.internal.CacheAccessWorker$2.run(CacheAccessWorker.java:184)
at org.gradle.internal.Factories$1.create(Factories.java:25)
at 
org.gradle.cache.internal.DefaultCacheAccess.useCache(DefaultCacheAccess.java:189)
at 
org.gradle.cache.internal.DefaultCacheAccess.useCache(DefaultCacheAccess.java:175)
at 
org.gradle.cache.internal.CacheAccessWorker.flushOperations(CacheAccessWorker.java:174)
at 
org.gradle.cache.internal.CacheAccessWorker.run(CacheAccessWorker.java:144)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at 
org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:46)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.gradle.api.UncheckedIOException: java.io.IOException: No space 
left on device
at 
org.gradle.cache.internal.btree.FileBackedBlockStore.write(FileBackedBlockStore.java:118)
at 
org.gradle.cache.internal.btree.CachingBlockStore.flush(CachingBlockStore.java:60)
at 
org.gradle.cache.internal.btree.FreeListBlockStore.flush(FreeListBlockStore.java:92)
at 
org.gradle.cache.internal.btree.StateCheckBlockStore.flush(StateCheckBlockStore.java:76)
at 
org.gradle.cache.internal.btree.BTreePersistentIndexedCache.put(BTreePersistentIndexedCache.java:155)
... 18 more
Caused by: java.io.IOException: No space left on device
at java.io.RandomAccessFile.writeBytes0(Native Method)
at java.io.RandomAccessFile.writeBytes(RandomAccessFile.java:520)
at java.io.RandomAccessFile.write(RandomAccessFile.java:550)
at 
org.gradle.internal.io.RandomAccessFileOutputStream.write(RandomAccessFileOutputStream.java:46)
at 
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.FilterOutputStream.flush(FilterOutputStream.java:140)
at org.gradle.cache.internal.btree.ByteOutput.done(ByteOutput.java:62)
at 
org.gradle.cache.internal.btree.FileBackedBlockStore$BlockImpl.write(FileBackedBlockStore.java:200)
at 
org.gradle.cache.internal.btree.FileBackedBlockStore.write(FileBackedBlockStore.java:114)
... 22 more
Failed to execute 
org.gradle.cache.internal.AsyncCacheAccessDecoratedCache$2@6bce8e6.
org.gradle.api.UncheckedIOException: Could not add entry '7644327230823428109' 
to cache fileSnapshots.bin 

[jira] [Created] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required

2018-06-11 Thread Dhruvil Shah (JIRA)
Dhruvil Shah created KAFKA-7045:
---

 Summary: Consumer may not be able to consume all messages when 
down-conversion is required
 Key: KAFKA-7045
 URL: https://issues.apache.org/jira/browse/KAFKA-7045
 Project: Kafka
  Issue Type: Bug
  Components: consumer, core
Affects Versions: 1.0.1, 1.1.0, 0.11.0.2, 1.0.0, 0.11.0.1, 0.11.0.0, 2.0.0
Reporter: Dhruvil Shah
 Fix For: 2.1.0


When down-conversion is required, the consumer might fail consuming messages 
under certain conditions. Couple such cases are outlined below:
 # When consuming from a compacted topic, it is possible that the consumer 
wants to fetch messages that fall in the middle of a batch but the messages 
have been compacted by the cleaner. For example, let's say we have the 
following two segments. The brackets indicate a single batch of messages and 
the numbers within are the message offsets.

Segment #1: [0, 1, 2],  [3, 4, 5], [6, 7, 8]
Segment #2: [9, 10, 11], [12, 13, 14]

If the cleaner were to come in now and clean up messages with offsets 7 and 8, 
the segments would look like the following:

Segment #1: [0, 1, 2], [3, 4, 5], [6]
Segment #2: [9, 10, 11], [12, 13, 14]

A consumer attempting to fetch messages at offset 7 will start reading the 
batch starting at offset 6. During down-conversion, we will drop the record 
starting at 6 it is less than the current fetch start offset. However, there 
are no messages in the log following offset 6. In such cases, we return the 
`FileRecords` itself which would cause the consumer to throw an exception 
because it does not understand the stored message format.


 # When consuming from a topic with transactional messages, down-conversion 
usually drops control batches because these did not exist in V0 and V1 message 
formats. If there are no message batches following the control batch in the 
particular segment (or if we are at the end of the log), we would again get no 
records after down-conversion and will return the `FileRecords`. Because the 
consumer is not able to interpret control batches, it will again throw an 
exception.

Relevant code from 1.x release that sends `FileRecords` when we are not able to 
down-convert any messages:
```
public ConvertedRecords downConvert(byte toMagic, long 
firstOffset, Time time) {
 ConvertedRecords convertedRecords = downConvert(batches, 
toMagic, firstOffset, time);
 if (convertedRecords.recordsProcessingStats().numRecordsConverted() == 0) {
 // This indicates that the message is too large, which means that the buffer 
is not large
 // enough to hold a full record batch. We just return all the bytes in this 
instance.
 // Even though the record batch does not have the right format version, we 
expect old clients
 // to raise an error to the user after reading the record batch size and 
seeing that there
 // are not enough available bytes in the response to read it fully. Note that 
this is
 // only possible prior to KIP-74, after which the broker was changed to always 
return at least
 // one full record batch, even if it requires exceeding the max fetch size 
requested by the client.
 return new ConvertedRecords<>(this, RecordsProcessingStats.EMPTY);
 } else {
 return convertedRecords;
 }
}
``` 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7044) kafka-consumer-groups.sh NullPointerException describing round robin or sticky assignors

2018-06-11 Thread Jeff Field (JIRA)
Jeff Field created KAFKA-7044:
-

 Summary: kafka-consumer-groups.sh NullPointerException describing 
round robin or sticky assignors
 Key: KAFKA-7044
 URL: https://issues.apache.org/jira/browse/KAFKA-7044
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 1.1.0
 Environment: CentOS 7.4, Oracle JDK 1.8
Reporter: Jeff Field


We've recently moved to using the round robin assignor for one of our consumer 
groups, and started testing the sticky assignor. In both cases, using Kafka 
1.1.0 we get a null pointer exception *unless* the group being described is 
rebalancing:
{code:java}
kafka-consumer-groups --bootstrap-server fqdn:9092 --describe --group 
groupname-for-consumer

Error: Executing consumer group command failed due to null
[2018-06-12 01:32:34,179] DEBUG Exception in consumer group command 
(kafka.admin.ConsumerGroupCommand$)
java.lang.NullPointerException
at scala.Predef$.Long2long(Predef.scala:363)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:612)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$getLogEndOffsets$2.apply(ConsumerGroupCommand.scala:610)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.getLogEndOffsets(ConsumerGroupCommand.scala:610)
at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describePartitions(ConsumerGroupCommand.scala:328)
at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.collectConsumerAssignment(ConsumerGroupCommand.scala:308)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectConsumerAssignment(ConsumerGroupCommand.scala:544)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:571)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10$$anonfun$13.apply(ConsumerGroupCommand.scala:565)
at scala.collection.immutable.List.flatMap(List.scala:338)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:565)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService$$anonfun$10.apply(ConsumerGroupCommand.scala:558)
at scala.Option.map(Option.scala:146)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.collectGroupOffsets(ConsumerGroupCommand.scala:558)
at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describeGroup(ConsumerGroupCommand.scala:271)
at 
kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:544)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:77)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)
[2018-06-12 01:32:34,255] DEBUG Removed sensor with name connections-closed: 
(org.apache.kafka.common.metrics.Metrics){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position

2018-06-11 Thread Ted Yu
+1
 Original message From: Ismael Juma  Date: 
6/11/18  5:43 PM  (GMT-08:00) To: dev  Subject: Re: 
[VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position 
Sounds good to me.

Ismael

On Mon, Jun 11, 2018 at 5:38 PM Jason Gustafson  wrote:

> Hey All,
>
> I wanted to get to some closure on this issue before the release. I think
> the config `default.api.timeout.ms` sounds good to me. Guozhang and I had
> actually discussed using this name before we saw Colin's comment.
>
> As for the default value, the main reason that the current `
> request.timeout.ms` is so high for the consumer is that we have to handle
> the special case of the JoinGroup, which can currently sit in purgatory for
> as long as `max.poll.interval.ms`. I'd like to propose making that a
> special case so that the default value can be changed to something more
> reasonable. In other words, the timeout for JoinGroup will be tied to `
> max.poll.interval.ms` direction and we will use `request.timeout.ms` for
> everything else. For the default values, I would suggest 30s for `
> request.timeout.ms` and 60s for `default.api.timeout.ms`.
>
> How does that sound?
>
> Thanks,
> Jason
>
>
>
> On Fri, Jun 8, 2018 at 10:39 AM, Colin McCabe  wrote:
>
> > On Wed, Jun 6, 2018, at 13:10, Guozhang Wang wrote:
> > > The reason that I'm hesitant to use the term "timeout" is that it's
> being
> > > over-used for multiple semantics: request RPC timeout, consumer session
> > > heartbeat liveness "timeout", and API blocking timeout. We can argue
> that
> > > in English both of them are indeed called a "timeout" value, but
> > personally
> > > I'm afraid for normal users having the same word `timeout` would be
> > > confusing, and hence I'm proposing for using a slight different term.
> >
> > Hmm.  I can see why you want to have a different-sounding name from the
> > existing timeouts.  However, I think it could be less clear to omit the
> > word timeout.  If your operation times out, and you get a
> TimeoutException,
> > what configuration do you raise?  The timeout.  If the configuration name
> > doesn't tell you that it's a timeout, it's harder to understand what
> needs
> > to be changed.
> >
> > For example, if "group.min.session.timeout.ms" was called something
> like "
> > group.min.session.block.ms" or "group.min.session.heartbeat.ms", it
> would
> > not be as clear.
> >
> > > Comparing with adding a new config, I'm actually more concerned about
> > > leveraging the request.timeout value for a default blocking timeout,
> > since
> > > the default value is hard to decide, since for different blocking
> calls,
> > it
> > > may have different rpc round trips behind the scene, so simply setting
> it
> > > as request.timeout + a delta may not be always good enough.
> >
> > Yes, I agree that we need a new configuration key.  I don't think we
> > should try to hard-code this.
> >
> > I think we should just bite the bullet and create a new configuration key
> > like "default.api.timeout.ms" that sets the default timeout for API
> > calls.  The hard part is adding the new configuration in a way that
> doesn't
> > disrupt existing configurations.
> >
> > There are at least a few cases to worry about:
> >
> > 1. Someone uses the default (pretty long) timeouts for everything.
> > 2. Someone has configured a short request.timeout.ms, in an effort to
> see
> > failures more quickly
> > 3. Someone has configured a very long (or maybe infinite)
> > request.timeout.ms
> >
> > Case #2 is probably the one which is hardest to support well.  We could
> > probably do it with logic like this:
> >
> > A. If default.api.timeout.ms is explicitly set, we use that value.
> > otherwise...
> > B. If request.timeout.ms is longer than 2 minutes, we set
> > default.api.timeout.ms to request.timeout.ms + 1500.  otherwise...
> >  we set default.api.timeout.ms to request.timeout.ms
> >
> > best,
> > Colin
> >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jun 5, 2018 at 5:18 PM, Ted Yu  wrote:
> > >
> > > > I see where the 0.5 in your previous response came about.
> > > >
> > > > The reason I wrote 'request.timeout.ms + 15000' was that I treat
> this
> > > > value
> > > > in place of the default.block.ms
> > > > According to your earlier description:
> > > >
> > > > bq. request.timeout.ms controls something different: the amount of
> > time
> > > > we're willing to wait for an RPC to complete.
> > > >
> > > > Basically we're in agreement. It is just that figuring out good
> > default is
> > > > non-trivial.
> > > >
> > > > On Tue, Jun 5, 2018 at 4:44 PM, Colin McCabe 
> > wrote:
> > > >
> > > > > On Tue, Jun 5, 2018, at 16:35, Ted Yu wrote:
> > > > > > bq. could probably come up with a good default
> > > > > >
> > > > > > That's the difficult part.
> > > > > >
> > > > > > bq. max(1000, 0.5 * request.timeout.ms)
> > > > > >
> > > > > > Looking at some existing samples:
> > > > > > In tests/kafkatest/tests/connect/templates/connect-distributed.
> > > > 

Jenkins build is back to normal : kafka-1.1-jdk7 #141

2018-06-11 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position

2018-06-11 Thread Ismael Juma
Sounds good to me.

Ismael

On Mon, Jun 11, 2018 at 5:38 PM Jason Gustafson  wrote:

> Hey All,
>
> I wanted to get to some closure on this issue before the release. I think
> the config `default.api.timeout.ms` sounds good to me. Guozhang and I had
> actually discussed using this name before we saw Colin's comment.
>
> As for the default value, the main reason that the current `
> request.timeout.ms` is so high for the consumer is that we have to handle
> the special case of the JoinGroup, which can currently sit in purgatory for
> as long as `max.poll.interval.ms`. I'd like to propose making that a
> special case so that the default value can be changed to something more
> reasonable. In other words, the timeout for JoinGroup will be tied to `
> max.poll.interval.ms` direction and we will use `request.timeout.ms` for
> everything else. For the default values, I would suggest 30s for `
> request.timeout.ms` and 60s for `default.api.timeout.ms`.
>
> How does that sound?
>
> Thanks,
> Jason
>
>
>
> On Fri, Jun 8, 2018 at 10:39 AM, Colin McCabe  wrote:
>
> > On Wed, Jun 6, 2018, at 13:10, Guozhang Wang wrote:
> > > The reason that I'm hesitant to use the term "timeout" is that it's
> being
> > > over-used for multiple semantics: request RPC timeout, consumer session
> > > heartbeat liveness "timeout", and API blocking timeout. We can argue
> that
> > > in English both of them are indeed called a "timeout" value, but
> > personally
> > > I'm afraid for normal users having the same word `timeout` would be
> > > confusing, and hence I'm proposing for using a slight different term.
> >
> > Hmm.  I can see why you want to have a different-sounding name from the
> > existing timeouts.  However, I think it could be less clear to omit the
> > word timeout.  If your operation times out, and you get a
> TimeoutException,
> > what configuration do you raise?  The timeout.  If the configuration name
> > doesn't tell you that it's a timeout, it's harder to understand what
> needs
> > to be changed.
> >
> > For example, if "group.min.session.timeout.ms" was called something
> like "
> > group.min.session.block.ms" or "group.min.session.heartbeat.ms", it
> would
> > not be as clear.
> >
> > > Comparing with adding a new config, I'm actually more concerned about
> > > leveraging the request.timeout value for a default blocking timeout,
> > since
> > > the default value is hard to decide, since for different blocking
> calls,
> > it
> > > may have different rpc round trips behind the scene, so simply setting
> it
> > > as request.timeout + a delta may not be always good enough.
> >
> > Yes, I agree that we need a new configuration key.  I don't think we
> > should try to hard-code this.
> >
> > I think we should just bite the bullet and create a new configuration key
> > like "default.api.timeout.ms" that sets the default timeout for API
> > calls.  The hard part is adding the new configuration in a way that
> doesn't
> > disrupt existing configurations.
> >
> > There are at least a few cases to worry about:
> >
> > 1. Someone uses the default (pretty long) timeouts for everything.
> > 2. Someone has configured a short request.timeout.ms, in an effort to
> see
> > failures more quickly
> > 3. Someone has configured a very long (or maybe infinite)
> > request.timeout.ms
> >
> > Case #2 is probably the one which is hardest to support well.  We could
> > probably do it with logic like this:
> >
> > A. If default.api.timeout.ms is explicitly set, we use that value.
> > otherwise...
> > B. If request.timeout.ms is longer than 2 minutes, we set
> > default.api.timeout.ms to request.timeout.ms + 1500.  otherwise...
> >  we set default.api.timeout.ms to request.timeout.ms
> >
> > best,
> > Colin
> >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jun 5, 2018 at 5:18 PM, Ted Yu  wrote:
> > >
> > > > I see where the 0.5 in your previous response came about.
> > > >
> > > > The reason I wrote 'request.timeout.ms + 15000' was that I treat
> this
> > > > value
> > > > in place of the default.block.ms
> > > > According to your earlier description:
> > > >
> > > > bq. request.timeout.ms controls something different: the amount of
> > time
> > > > we're willing to wait for an RPC to complete.
> > > >
> > > > Basically we're in agreement. It is just that figuring out good
> > default is
> > > > non-trivial.
> > > >
> > > > On Tue, Jun 5, 2018 at 4:44 PM, Colin McCabe 
> > wrote:
> > > >
> > > > > On Tue, Jun 5, 2018, at 16:35, Ted Yu wrote:
> > > > > > bq. could probably come up with a good default
> > > > > >
> > > > > > That's the difficult part.
> > > > > >
> > > > > > bq. max(1000, 0.5 * request.timeout.ms)
> > > > > >
> > > > > > Looking at some existing samples:
> > > > > > In tests/kafkatest/tests/connect/templates/connect-distributed.
> > > > properties
> > > > > ,
> > > > > > we have:
> > > > > >   request.timeout.ms=3
> > > > > >
> > > > > > Isn't the above formula putting an upper bound 15000 for the RPC
> > > > timeout

Re: [VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position

2018-06-11 Thread Guozhang Wang
Re: special handling JoinGroup request so that we can use reasonable
request.timeout.ms, sounds great to me :)


Guozhang

On Mon, Jun 11, 2018 at 5:38 PM, Jason Gustafson  wrote:

> Hey All,
>
> I wanted to get to some closure on this issue before the release. I think
> the config `default.api.timeout.ms` sounds good to me. Guozhang and I had
> actually discussed using this name before we saw Colin's comment.
>
> As for the default value, the main reason that the current `
> request.timeout.ms` is so high for the consumer is that we have to handle
> the special case of the JoinGroup, which can currently sit in purgatory for
> as long as `max.poll.interval.ms`. I'd like to propose making that a
> special case so that the default value can be changed to something more
> reasonable. In other words, the timeout for JoinGroup will be tied to `
> max.poll.interval.ms` direction and we will use `request.timeout.ms` for
> everything else. For the default values, I would suggest 30s for `
> request.timeout.ms` and 60s for `default.api.timeout.ms`.
>
> How does that sound?
>
> Thanks,
> Jason
>
>
>
> On Fri, Jun 8, 2018 at 10:39 AM, Colin McCabe  wrote:
>
> > On Wed, Jun 6, 2018, at 13:10, Guozhang Wang wrote:
> > > The reason that I'm hesitant to use the term "timeout" is that it's
> being
> > > over-used for multiple semantics: request RPC timeout, consumer session
> > > heartbeat liveness "timeout", and API blocking timeout. We can argue
> that
> > > in English both of them are indeed called a "timeout" value, but
> > personally
> > > I'm afraid for normal users having the same word `timeout` would be
> > > confusing, and hence I'm proposing for using a slight different term.
> >
> > Hmm.  I can see why you want to have a different-sounding name from the
> > existing timeouts.  However, I think it could be less clear to omit the
> > word timeout.  If your operation times out, and you get a
> TimeoutException,
> > what configuration do you raise?  The timeout.  If the configuration name
> > doesn't tell you that it's a timeout, it's harder to understand what
> needs
> > to be changed.
> >
> > For example, if "group.min.session.timeout.ms" was called something
> like "
> > group.min.session.block.ms" or "group.min.session.heartbeat.ms", it
> would
> > not be as clear.
> >
> > > Comparing with adding a new config, I'm actually more concerned about
> > > leveraging the request.timeout value for a default blocking timeout,
> > since
> > > the default value is hard to decide, since for different blocking
> calls,
> > it
> > > may have different rpc round trips behind the scene, so simply setting
> it
> > > as request.timeout + a delta may not be always good enough.
> >
> > Yes, I agree that we need a new configuration key.  I don't think we
> > should try to hard-code this.
> >
> > I think we should just bite the bullet and create a new configuration key
> > like "default.api.timeout.ms" that sets the default timeout for API
> > calls.  The hard part is adding the new configuration in a way that
> doesn't
> > disrupt existing configurations.
> >
> > There are at least a few cases to worry about:
> >
> > 1. Someone uses the default (pretty long) timeouts for everything.
> > 2. Someone has configured a short request.timeout.ms, in an effort to
> see
> > failures more quickly
> > 3. Someone has configured a very long (or maybe infinite)
> > request.timeout.ms
> >
> > Case #2 is probably the one which is hardest to support well.  We could
> > probably do it with logic like this:
> >
> > A. If default.api.timeout.ms is explicitly set, we use that value.
> > otherwise...
> > B. If request.timeout.ms is longer than 2 minutes, we set
> > default.api.timeout.ms to request.timeout.ms + 1500.  otherwise...
> >  we set default.api.timeout.ms to request.timeout.ms
> >
> > best,
> > Colin
> >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Jun 5, 2018 at 5:18 PM, Ted Yu  wrote:
> > >
> > > > I see where the 0.5 in your previous response came about.
> > > >
> > > > The reason I wrote 'request.timeout.ms + 15000' was that I treat
> this
> > > > value
> > > > in place of the default.block.ms
> > > > According to your earlier description:
> > > >
> > > > bq. request.timeout.ms controls something different: the amount of
> > time
> > > > we're willing to wait for an RPC to complete.
> > > >
> > > > Basically we're in agreement. It is just that figuring out good
> > default is
> > > > non-trivial.
> > > >
> > > > On Tue, Jun 5, 2018 at 4:44 PM, Colin McCabe 
> > wrote:
> > > >
> > > > > On Tue, Jun 5, 2018, at 16:35, Ted Yu wrote:
> > > > > > bq. could probably come up with a good default
> > > > > >
> > > > > > That's the difficult part.
> > > > > >
> > > > > > bq. max(1000, 0.5 * request.timeout.ms)
> > > > > >
> > > > > > Looking at some existing samples:
> > > > > > In tests/kafkatest/tests/connect/templates/connect-distributed.
> > > > properties
> > > > > ,
> > > > > > we have:
> > > > > >   request.timeout.ms=3
> > > > > 

Re: [VOTE] KIP-266: Add TimeoutException for KafkaConsumer#position

2018-06-11 Thread Jason Gustafson
Hey All,

I wanted to get to some closure on this issue before the release. I think
the config `default.api.timeout.ms` sounds good to me. Guozhang and I had
actually discussed using this name before we saw Colin's comment.

As for the default value, the main reason that the current `
request.timeout.ms` is so high for the consumer is that we have to handle
the special case of the JoinGroup, which can currently sit in purgatory for
as long as `max.poll.interval.ms`. I'd like to propose making that a
special case so that the default value can be changed to something more
reasonable. In other words, the timeout for JoinGroup will be tied to `
max.poll.interval.ms` direction and we will use `request.timeout.ms` for
everything else. For the default values, I would suggest 30s for `
request.timeout.ms` and 60s for `default.api.timeout.ms`.

How does that sound?

Thanks,
Jason



On Fri, Jun 8, 2018 at 10:39 AM, Colin McCabe  wrote:

> On Wed, Jun 6, 2018, at 13:10, Guozhang Wang wrote:
> > The reason that I'm hesitant to use the term "timeout" is that it's being
> > over-used for multiple semantics: request RPC timeout, consumer session
> > heartbeat liveness "timeout", and API blocking timeout. We can argue that
> > in English both of them are indeed called a "timeout" value, but
> personally
> > I'm afraid for normal users having the same word `timeout` would be
> > confusing, and hence I'm proposing for using a slight different term.
>
> Hmm.  I can see why you want to have a different-sounding name from the
> existing timeouts.  However, I think it could be less clear to omit the
> word timeout.  If your operation times out, and you get a TimeoutException,
> what configuration do you raise?  The timeout.  If the configuration name
> doesn't tell you that it's a timeout, it's harder to understand what needs
> to be changed.
>
> For example, if "group.min.session.timeout.ms" was called something like "
> group.min.session.block.ms" or "group.min.session.heartbeat.ms", it would
> not be as clear.
>
> > Comparing with adding a new config, I'm actually more concerned about
> > leveraging the request.timeout value for a default blocking timeout,
> since
> > the default value is hard to decide, since for different blocking calls,
> it
> > may have different rpc round trips behind the scene, so simply setting it
> > as request.timeout + a delta may not be always good enough.
>
> Yes, I agree that we need a new configuration key.  I don't think we
> should try to hard-code this.
>
> I think we should just bite the bullet and create a new configuration key
> like "default.api.timeout.ms" that sets the default timeout for API
> calls.  The hard part is adding the new configuration in a way that doesn't
> disrupt existing configurations.
>
> There are at least a few cases to worry about:
>
> 1. Someone uses the default (pretty long) timeouts for everything.
> 2. Someone has configured a short request.timeout.ms, in an effort to see
> failures more quickly
> 3. Someone has configured a very long (or maybe infinite)
> request.timeout.ms
>
> Case #2 is probably the one which is hardest to support well.  We could
> probably do it with logic like this:
>
> A. If default.api.timeout.ms is explicitly set, we use that value.
> otherwise...
> B. If request.timeout.ms is longer than 2 minutes, we set
> default.api.timeout.ms to request.timeout.ms + 1500.  otherwise...
>  we set default.api.timeout.ms to request.timeout.ms
>
> best,
> Colin
>
> >
> >
> > Guozhang
> >
> >
> > On Tue, Jun 5, 2018 at 5:18 PM, Ted Yu  wrote:
> >
> > > I see where the 0.5 in your previous response came about.
> > >
> > > The reason I wrote 'request.timeout.ms + 15000' was that I treat this
> > > value
> > > in place of the default.block.ms
> > > According to your earlier description:
> > >
> > > bq. request.timeout.ms controls something different: the amount of
> time
> > > we're willing to wait for an RPC to complete.
> > >
> > > Basically we're in agreement. It is just that figuring out good
> default is
> > > non-trivial.
> > >
> > > On Tue, Jun 5, 2018 at 4:44 PM, Colin McCabe 
> wrote:
> > >
> > > > On Tue, Jun 5, 2018, at 16:35, Ted Yu wrote:
> > > > > bq. could probably come up with a good default
> > > > >
> > > > > That's the difficult part.
> > > > >
> > > > > bq. max(1000, 0.5 * request.timeout.ms)
> > > > >
> > > > > Looking at some existing samples:
> > > > > In tests/kafkatest/tests/connect/templates/connect-distributed.
> > > properties
> > > > ,
> > > > > we have:
> > > > >   request.timeout.ms=3
> > > > >
> > > > > Isn't the above formula putting an upper bound 15000 for the RPC
> > > timeout
> > > > ?
> > > >
> > > > Correct.  It would put a 15 second default on the RPC timeout in this
> > > > case.  If that's too short, the user has the option to change it.
> > > >
> > > > If we feel that 15 seconds is too short, we could put a floor of 30
> > > > seconds or whatever on the RPC timeout, instead of 1 second.
> > > >
> > > > >
> 

Jenkins build is back to normal : kafka-2.0-jdk8 #11

2018-06-11 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-6906) Kafka Streams does not commit transactions if data is produced via wall-clock punctuation

2018-06-11 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-6906.

   Resolution: Fixed
Fix Version/s: 1.1.1
   2.0.0

> Kafka Streams does not commit transactions if data is produced via wall-clock 
> punctuation
> -
>
> Key: KAFKA-6906
> URL: https://issues.apache.org/jira/browse/KAFKA-6906
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Jagadesh Adireddi
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> Committing in Kafka Streams happens in regular intervals. However, committing 
> only happens if new input records got processed since the last commit (via 
> setting flag `commitOffsetNeeded` within `StreamTask#process()`)
> However, data could also be emitted via wall-clock based punctuation calls. 
> Especially if EOS is enabled, this is an issue (maybe also for non-EOS) 
> because the current running transaction is not committed and thus might time 
> out leading to a fatal error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-297: Externalizing Secrets for Connect Configurations

2018-06-11 Thread Colin McCabe
Sounds good.  Thanks, Konstantin.

Colin


On Mon, Jun 11, 2018, at 13:41, Rajini Sivaram wrote:
> Hi Konstantine,
> 
> Sounds reasonable to me too.
> 
> Regards,
> 
> Rajini
> 
> On Mon, Jun 11, 2018 at 7:55 PM, Robert Yokota  wrote:
> 
> > Hi Konstantine,
> >
> > Sounds reasonable!
> >
> > Thanks,
> > Robert
> >
> > On Mon, Jun 11, 2018 at 11:49 AM, Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Hi everyone, after fixing an issue with a regular expression in Connect's
> > > class loading isolation of the new component type ConfigProvider here:
> > >
> > > https://github.com/apache/kafka/pull/5177
> > >
> > > I noticed that the new interface ConfigProvider, along with its first
> > > implementation FileConfigProvider, have been placed in the package:
> > >
> > > org.apache.kafka.common.config
> > >
> > > This specific package is mentioned in KIP-297 is a few places, but not in
> > > any code snippets. I'd like to suggest moving the interface and any
> > current
> > > of future implementation classes in a new package named:
> > >
> > > org.apache.kafka.common.config.provider
> > >
> > > and update the KIP document accordingly.
> > >
> > > This seems to make sense in general. But, specifically, in Connect it is
> > > desired since we treat ConfigProvider implementations as Connect
> > components
> > > that are loaded in isolation. Having a package for config providers will
> > > allow us to avoid making any assumptions with respect to the name of a
> > > class that implements `ConfigProvider` and is included in Apache Kafka.
> > It
> > > will suffice for this class to reside in the package
> > > org.apache.kafka.common.config.provider.
> > >
> > > Let me know if this is a reasonable request and if you agree on amending
> > > the KIP description.
> > >
> > > - Konstantine
> > >
> > >
> > >
> > > On Wed, May 16, 2018 at 10:33 AM, Rajini Sivaram <
> > rajinisiva...@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the update, Robert. Looks good to me.
> > > >
> > > > Regards,
> > > >
> > > > Rajini
> > > >
> > > > On Wed, May 16, 2018 at 4:43 PM, Robert Yokota 
> > > wrote:
> > > >
> > > > > Hi Rajini,
> > > > >
> > > > > Thanks for the excellent feedback!
> > > > >
> > > > > I've made the API changes that you've requested in the KIP.
> > > > >
> > > > >
> > > > > > 1. Are we expecting one provider instance with different contexts
> > > > > > provided to `ConfigProvider.get()`? If we created a different
> > > provider
> > > > > > instance for each context, we could deal with scheduling reloads in
> > > the
> > > > > > provider implementation?
> > > > >
> > > > > Yes, there would be one provider instance.  I've collapsed the
> > > > > ConfigContext and the ConfigChangeCallback by adding a parameter
> > > delayMs
> > > > to
> > > > > indicate when the change will happen.  When a particular
> > ConfigProvider
> > > > > retrieves a lease duration along with a key, it can either 1)
> > > schedule a
> > > > > background thread to push out the change when it happens (at which
> > time
> > > > the
> > > > > delayMs will be 0), or invoke the callback immediately with the lease
> > > > > duration set as delayMs (of course, in this case the values for the
> > > keys
> > > > > will be the old values).  A ConfProvider could be parameterized to do
> > > one
> > > > > or the other.
> > > > >
> > > > >
> > > > > > 2. Couldn't ConfigData  be an interface that just returns a map of
> > > > > > key-value pairs. Providers that return metadata could extend it to
> > > > > provide
> > > > > > metadata in a meaningful format instead of Map.
> > > > >
> > > > > I've replaced ConfigData with Map as you suggested.
> > > > >
> > > > >
> > > > > > 3. For ZK, we would use ConfigProvider.get() without `keys` to get
> > > all
> > > > > > keys in the path. Do we have two get() methods since some providers
> > > > need
> > > > > > keys to be specified and some don't? How do we decide which one to
> > > use?
> > > > >
> > > > > The ConfigProvider should be thought of like a Map interface and does
> > > not
> > > > > require that one signature of get() be preferred over the other.
> > > KIP-226
> > > > > can use get(String path) while Connect will use get(String path,
> > > > > Set) since it knows which keys it is interested in.
> > > > >
> > > > >
> > > > > A few more updates to the KIP:
> > > > >
> > > > > - I've elided the ConfigTransformer implementation as Colin
> > suggested.
> > > > > - The variable reference now looks like ${provider:[path:]key} where
> > > the
> > > > > path is optional.
> > > > >
> > > > >
> > > > > Thanks!
> > > > > Robert
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 16, 2018 at 4:30 AM, Rajini Sivaram <
> > > rajinisiva...@gmail.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Robert,
> > > > > >
> > > > > > Thanks for the KIP updates.
> > > > > >
> > > > > > The interfaces look suitable for brokers, with some small changes.
> > If
> > > > we
> > > > > > can adapt the 

Jenkins build is back to normal : kafka-trunk-jdk10 #191

2018-06-11 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-7003) Add headers with error context in messages written to the Connect DeadLetterQueue topic

2018-06-11 Thread Ewen Cheslack-Postava (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-7003.
--
   Resolution: Fixed
Fix Version/s: 2.1.0
   2.0.0

Issue resolved by pull request 5159
[https://github.com/apache/kafka/pull/5159]

> Add headers with error context in messages written to the Connect 
> DeadLetterQueue topic
> ---
>
> Key: KAFKA-7003
> URL: https://issues.apache.org/jira/browse/KAFKA-7003
> Project: Kafka
>  Issue Type: Task
>Reporter: Arjun Satish
>Priority: Major
> Fix For: 2.0.0, 2.1.0
>
>
> This was added to the KIP after the feature freeze. 
> If the property {{errors.deadletterqueue.}}{{context.headers.enable}} is set 
> to {{*true*}}, the following headers will be added to the produced raw 
> message (only if they don't already exist in the message). All values will be 
> serialized as UTF-8 strings.
> ||Header Name||Description||
> |__connect.errors.topic|Name of the topic that contained the message.|
> |__connect.errors.task.id|The numeric ID of the task that encountered the 
> error (encoded as a UTF-8 string).|
> |__connect.errors.stage|The name of the stage where the error occurred.|
> |__connect.errors.partition|The numeric ID of the partition in the original 
> topic that contained the message (encoded as a UTF-8 string).|
> |__connect.errors.offset|The numeric value of the message offset in the 
> original topic (encoded as a UTF-8 string).|
> |__connect.errors.exception.stacktrace|The stacktrace of the exception.|
> |__connect.errors.exception.message|The message in the exception.|
> |__connect.errors.exception.class.name|The fully qualified classname of the 
> exception that was thrown during the execution.|
> |__connect.errors.connector.name|The name of the connector which encountered 
> the error.|
> |__connect.errors.class.name|The fully qualified name of the class that 
> caused the error.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7043) Connect isolation whitelist does not include new primitive converters (KIP-305)

2018-06-11 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-7043:


 Summary: Connect isolation whitelist does not include new 
primitive converters (KIP-305)
 Key: KAFKA-7043
 URL: https://issues.apache.org/jira/browse/KAFKA-7043
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Randall Hauch
Assignee: Randall Hauch
 Fix For: 2.0.0


KIP-305 added several new primitive converters, but the PR did not add them to 
the whitelist for the plugin isolation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions

2018-06-11 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-7042:


 Summary: Fall back to the old behavior when the broker is too old 
to recognize LIST_OFFSET versions
 Key: KAFKA-7042
 URL: https://issues.apache.org/jira/browse/KAFKA-7042
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang


When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires 
min_version to be 2 on the consumer client side. On the other hand, if broker 
is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up to 
1. In this case the consumer talking to such an old broker will throw an 
exception right away.

What we can improve though, is that when the consumer realized broker does not 
recognize LIST_OFFSET of at least 2, it can fall back to the old behavior of 
READ_UNCOMMITTED since the data on that broker should not have any txn markers 
anyways. By doing this we would lift the hard restriction that consumers with 
READ_COMMITTED cannot work with an older version of broker (remember we are 
trying to achieve broker compatibility since 0.10.0).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7041) Using RocksDB bulk loading for StandbyTasks

2018-06-11 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-7041:
--

 Summary: Using RocksDB bulk loading for StandbyTasks
 Key: KAFKA-7041
 URL: https://issues.apache.org/jira/browse/KAFKA-7041
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


In KAFKA-5363 we introduced RocksDB bulk loading to speed up store recovery. We 
could do the same optimization for StandbyTasks to make them more efficient and 
to reduce the likelihood that StandbyTasks lag behind.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: kafka ack=all and min-isr

2018-06-11 Thread Carl Samuelson
Thanks Ismael - that explains it.
Thanks for the link James - I will take a look.

On Sat, Jun 9, 2018 at 6:13 AM, James Cheng  wrote:

> I wrote a blog post on min.isr that explains it in more detail:
> https://logallthethings.com/2016/07/11/min-insync-replicas-what-does-it-
> actually-do/
>
> The post is 2 years old, but I think it's still correct.
>
> -James
>
> > On Jun 7, 2018, at 10:31 PM, Carl Samuelson 
> wrote:
> >
> > Hi
> >
> > Hopefully this is the correct email address and forum for this.
> > I asked this question on stack overflow, but did not get an answer:
> > https://stackoverflow.com/questions/50689177/kafka-ack-all-and-min-isr
> >
> > *Summary*
> >
> > The docs and code comments for Kafka suggest that when the producer
> setting
> > acks is set to allthen an ack will only be sent to the producer when *all
> > in-sync replicas have caught up*, but the code (Partition.Scala,
> > checkEnoughReplicasReachOffset) seems to suggest that the ack is sent as
> > soon as *min in-sync replicas have caught up*.
> >
> > *Details*
> >
> > The kafka docs have this:
> >
> > acks=all This means the leader will wait for the full set of in-sync
> > replicas to acknowledge the record. source
> > 
> >
> > Also, looking at the Kafka source code - partition.scala
> > checkEnoughReplicasReachOffset() has the following comment (emphasis
> mine):
> >
> > Note that this method will only be called if requiredAcks = -1 and we are
> > waiting for *all replicas*in ISR to be fully caught up to the (local)
> > leader's offset corresponding to this produce request before we
> acknowledge
> > the produce request.
> >
> > Finally, this answer  on
> Stack
> > Overflow (emphasis mine again)
> >
> > Also the min in-sync replica setting specifies the minimum number of
> > replicas that need to be in-sync for the partition to remain available
> for
> > writes. When a producer specifies ack (-1 / all config) it will still
> wait
> > for acks from *all in sync replicas* at that moment (independent of the
> > setting for min in-sync replicas).
> >
> > But when I look at the code in Partition.Scala (note minIsr <
> > curInSyncReplicas.size):
> >
> > def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean,
> Errors) = {
> >  ...
> >  val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> >  if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
> >if (minIsr <= curInSyncReplicas.size)
> >  (true, Errors.NONE)
> >
> > The code that calls this returns the ack:
> >
> > if (error != Errors.NONE || hasEnough) {
> >  status.acksPending = false
> >  status.responseStatus.error = error
> > }
> >
> > So, the code looks like it returns an ack as soon as the in-sync replica
> > set are greater than min in-sync replicas. However, the documentation and
> > comments suggest that the ack is only sent once all in-sync replicas have
> > caught up. What am I missing? At the very least, the comment above
> > checkEnoughReplicasReachOffset looks like it should be changed.
> > Regards,
> >
> > Carl
>
>


Re: [DISCUSS] KIP-297: Externalizing Secrets for Connect Configurations

2018-06-11 Thread Rajini Sivaram
Hi Konstantine,

Sounds reasonable to me too.

Regards,

Rajini

On Mon, Jun 11, 2018 at 7:55 PM, Robert Yokota  wrote:

> Hi Konstantine,
>
> Sounds reasonable!
>
> Thanks,
> Robert
>
> On Mon, Jun 11, 2018 at 11:49 AM, Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Hi everyone, after fixing an issue with a regular expression in Connect's
> > class loading isolation of the new component type ConfigProvider here:
> >
> > https://github.com/apache/kafka/pull/5177
> >
> > I noticed that the new interface ConfigProvider, along with its first
> > implementation FileConfigProvider, have been placed in the package:
> >
> > org.apache.kafka.common.config
> >
> > This specific package is mentioned in KIP-297 is a few places, but not in
> > any code snippets. I'd like to suggest moving the interface and any
> current
> > of future implementation classes in a new package named:
> >
> > org.apache.kafka.common.config.provider
> >
> > and update the KIP document accordingly.
> >
> > This seems to make sense in general. But, specifically, in Connect it is
> > desired since we treat ConfigProvider implementations as Connect
> components
> > that are loaded in isolation. Having a package for config providers will
> > allow us to avoid making any assumptions with respect to the name of a
> > class that implements `ConfigProvider` and is included in Apache Kafka.
> It
> > will suffice for this class to reside in the package
> > org.apache.kafka.common.config.provider.
> >
> > Let me know if this is a reasonable request and if you agree on amending
> > the KIP description.
> >
> > - Konstantine
> >
> >
> >
> > On Wed, May 16, 2018 at 10:33 AM, Rajini Sivaram <
> rajinisiva...@gmail.com>
> > wrote:
> >
> > > Thanks for the update, Robert. Looks good to me.
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > > On Wed, May 16, 2018 at 4:43 PM, Robert Yokota 
> > wrote:
> > >
> > > > Hi Rajini,
> > > >
> > > > Thanks for the excellent feedback!
> > > >
> > > > I've made the API changes that you've requested in the KIP.
> > > >
> > > >
> > > > > 1. Are we expecting one provider instance with different contexts
> > > > > provided to `ConfigProvider.get()`? If we created a different
> > provider
> > > > > instance for each context, we could deal with scheduling reloads in
> > the
> > > > > provider implementation?
> > > >
> > > > Yes, there would be one provider instance.  I've collapsed the
> > > > ConfigContext and the ConfigChangeCallback by adding a parameter
> > delayMs
> > > to
> > > > indicate when the change will happen.  When a particular
> ConfigProvider
> > > > retrieves a lease duration along with a key, it can either 1)
> > schedule a
> > > > background thread to push out the change when it happens (at which
> time
> > > the
> > > > delayMs will be 0), or invoke the callback immediately with the lease
> > > > duration set as delayMs (of course, in this case the values for the
> > keys
> > > > will be the old values).  A ConfProvider could be parameterized to do
> > one
> > > > or the other.
> > > >
> > > >
> > > > > 2. Couldn't ConfigData  be an interface that just returns a map of
> > > > > key-value pairs. Providers that return metadata could extend it to
> > > > provide
> > > > > metadata in a meaningful format instead of Map.
> > > >
> > > > I've replaced ConfigData with Map as you suggested.
> > > >
> > > >
> > > > > 3. For ZK, we would use ConfigProvider.get() without `keys` to get
> > all
> > > > > keys in the path. Do we have two get() methods since some providers
> > > need
> > > > > keys to be specified and some don't? How do we decide which one to
> > use?
> > > >
> > > > The ConfigProvider should be thought of like a Map interface and does
> > not
> > > > require that one signature of get() be preferred over the other.
> > KIP-226
> > > > can use get(String path) while Connect will use get(String path,
> > > > Set) since it knows which keys it is interested in.
> > > >
> > > >
> > > > A few more updates to the KIP:
> > > >
> > > > - I've elided the ConfigTransformer implementation as Colin
> suggested.
> > > > - The variable reference now looks like ${provider:[path:]key} where
> > the
> > > > path is optional.
> > > >
> > > >
> > > > Thanks!
> > > > Robert
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, May 16, 2018 at 4:30 AM, Rajini Sivaram <
> > rajinisiva...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi Robert,
> > > > >
> > > > > Thanks for the KIP updates.
> > > > >
> > > > > The interfaces look suitable for brokers, with some small changes.
> If
> > > we
> > > > > can adapt the interface to implement the existing
> > DynamicBrokerConfig,
> > > > then
> > > > > we are good.
> > > > >
> > > > > With broker configs:
> > > > >
> > > > >1. We don't know what configs are in ZK since we allow custom
> > > configs.
> > > > >So we would use `ConfigProvider.get()` without specifying keys.
> > > > >2. We want to see all changes (i.e. changes under a path). We
> can
> > > 

[jira] [Resolved] (KAFKA-7005) Remove duplicate Java Resource class.

2018-06-11 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7005.

Resolution: Fixed

merged the PR to trunk and 2.0 branch.

> Remove duplicate Java Resource class.
> -
>
> Key: KAFKA-7005
> URL: https://issues.apache.org/jira/browse/KAFKA-7005
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
> The o.a.k.c.request.Resource class could be dropped in favour of 
> o.a.k.c..config.ConfigResource.
> This will remove the duplication of `Resource` classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7040:
-

 Summary: The replica fetcher thread may truncate accepted messages 
during multiple fast leadership transitions
 Key: KAFKA-7040
 URL: https://issues.apache.org/jira/browse/KAFKA-7040
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang


Problem Statement:
Consider the scenario where there are two brokers, broker0, and broker1, and 
there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
the leader and broker0 as the follower. The following sequence of events 
happened on broker0

1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
broker1, and awaits to get the response
2. A LeaderAndISR request causes broker0 to become the leader for one partition 
t1p0, which in turn will remove the partition t1p0 from the replica fetcher 
thread
3. Broker0 accepts some messages from a producer
4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 
to become the follower for partition t1p0. This will cause the partition t1p0 
to be added back to the replica fetcher thread on broker0.
5. The replica fetcher thread on broker0 receives a response for the 
LeaderEpoch request issued in step 1, and truncates the accepted messages in 
step3.

The issue can be reproduced with the test from 
https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea

[1] Initially we set up broker0 to be the follower of two partitions instead of 
just one, to avoid the shutting down of the replica fetcher thread when it 
becomes idle.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk10 #190

2018-06-11 Thread Apache Jenkins Server
See 


Changes:

[lindong28] KAFKA-6946; Keep the session id for incremental fetch when fetch

--
[...truncated 2.01 MB...]

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testInner[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testInner[caching enabled = false] PASSED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testLeft[caching enabled = false] STARTED

org.apache.kafka.streams.integration.StreamTableJoinIntegrationTest > 
testLeft[caching enabled = false] PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenInputTopicAbsent STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenInputTopicAbsent PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenIntermediateTopicAbsent STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhenIntermediateTopicAbsent PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromFileAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromFileAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic PASSED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhileStreamsRunning STARTED

org.apache.kafka.streams.integration.ResetIntegrationTest > 
shouldNotAllowToResetWhileStreamsRunning PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuter[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeft[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = true] PASSED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = true] STARTED

org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 

Re: [DISCUSS] KIP-312: Add Overloaded StreamsBuilder Build Method to Accept java.util.Properties

2018-06-11 Thread John Roesler
Thanks Matthias,

I buy this reasoning.

-John

On Mon, Jun 11, 2018 at 12:48 PM, Matthias J. Sax 
wrote:

> @John: I don't think this is a good idea. `KafkaStreams` executes a
> `Topology` and should be agnostic if the topology was build manually or
> via `StreamsBuilder` (at least from my point of view).
>
> -Matthias
>
> On 6/11/18 9:53 AM, Guozhang Wang wrote:
> > Another implementation detail that we can consider: currently the
> > InternalTopologyBuilder#setApplicationId() is used because we do not
> have
> > such a mechanism to pass in configs to the topology building process.
> Once
> > we add such mechanism we should consider removing this function as well.
> >
> >
> > Guozhang
> >
> > On Mon, Jun 11, 2018 at 9:51 AM, Guozhang Wang 
> wrote:
> >
> >> Hello Bill,
> >>
> >> While working on https://github.com/apache/kafka/pull/5163 I am
> wondering
> >> if we can hide this from the public API, to e.g. add an additional
> function
> >> in InternalTopologyBuilder of InternalStreamsBuilder (since in your
> current
> >> working PR we're reusing InternalStreamsBuilder for the logical plan
> >> generation) which can then be called inside KafkaStreams constructors?
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Jun 11, 2018 at 9:41 AM, John Roesler 
> wrote:
> >>
> >>> Hi Bill,
> >>>
> >>> Thanks for the KIP.
> >>>
> >>> Just a small thought. This new API will result in calls that look like
> >>> this:
> >>> new KafkaStreams(builder.build(props), props);
> >>>
> >>> Do you think that's a significant enough eyesore to warrant adding a
> new
> >>> KafkaStreams constructor taking a KStreamsBuilder like this:
> >>> new KafkaStreams(builder, props);
> >>>
> >>> such that it would internally call builder.build(props) ?
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>>
> >>>
> >>> On Fri, Jun 8, 2018 at 7:16 PM, Ted Yu  wrote:
> >>>
>  Since there're only two values for the optional optimization config
>  introduced by KAFKA-6935, I wonder the overloaded build method (with
>  Properties
>  instance) would make the config unnecessary.
> 
>  nit:
>  * @return @return the {@link Topology} that represents the specified
>  processing logic
> 
>  Double @return above.
> 
>  Cheers
> 
>  On Fri, Jun 8, 2018 at 3:20 PM, Bill Bejeck 
> wrote:
> 
> > All,
> >
> > I'd like to start the discussion for adding an overloaded method to
> > StreamsBuilder taking a java.util.Properties instance.
> >
> > The KIP is located here :
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 312%3A+Add+Overloaded+StreamsBuilder+Build+Method+
> > to+Accept+java.util.Properties
> >
> > I look forward to your comments.
> >
> > Thanks,
> > Bill
> >
> 
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
>
>


Re: [DISCUSS] KIP-297: Externalizing Secrets for Connect Configurations

2018-06-11 Thread Robert Yokota
Hi Konstantine,

Sounds reasonable!

Thanks,
Robert

On Mon, Jun 11, 2018 at 11:49 AM, Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi everyone, after fixing an issue with a regular expression in Connect's
> class loading isolation of the new component type ConfigProvider here:
>
> https://github.com/apache/kafka/pull/5177
>
> I noticed that the new interface ConfigProvider, along with its first
> implementation FileConfigProvider, have been placed in the package:
>
> org.apache.kafka.common.config
>
> This specific package is mentioned in KIP-297 is a few places, but not in
> any code snippets. I'd like to suggest moving the interface and any current
> of future implementation classes in a new package named:
>
> org.apache.kafka.common.config.provider
>
> and update the KIP document accordingly.
>
> This seems to make sense in general. But, specifically, in Connect it is
> desired since we treat ConfigProvider implementations as Connect components
> that are loaded in isolation. Having a package for config providers will
> allow us to avoid making any assumptions with respect to the name of a
> class that implements `ConfigProvider` and is included in Apache Kafka. It
> will suffice for this class to reside in the package
> org.apache.kafka.common.config.provider.
>
> Let me know if this is a reasonable request and if you agree on amending
> the KIP description.
>
> - Konstantine
>
>
>
> On Wed, May 16, 2018 at 10:33 AM, Rajini Sivaram 
> wrote:
>
> > Thanks for the update, Robert. Looks good to me.
> >
> > Regards,
> >
> > Rajini
> >
> > On Wed, May 16, 2018 at 4:43 PM, Robert Yokota 
> wrote:
> >
> > > Hi Rajini,
> > >
> > > Thanks for the excellent feedback!
> > >
> > > I've made the API changes that you've requested in the KIP.
> > >
> > >
> > > > 1. Are we expecting one provider instance with different contexts
> > > > provided to `ConfigProvider.get()`? If we created a different
> provider
> > > > instance for each context, we could deal with scheduling reloads in
> the
> > > > provider implementation?
> > >
> > > Yes, there would be one provider instance.  I've collapsed the
> > > ConfigContext and the ConfigChangeCallback by adding a parameter
> delayMs
> > to
> > > indicate when the change will happen.  When a particular ConfigProvider
> > > retrieves a lease duration along with a key, it can either 1)
> schedule a
> > > background thread to push out the change when it happens (at which time
> > the
> > > delayMs will be 0), or invoke the callback immediately with the lease
> > > duration set as delayMs (of course, in this case the values for the
> keys
> > > will be the old values).  A ConfProvider could be parameterized to do
> one
> > > or the other.
> > >
> > >
> > > > 2. Couldn't ConfigData  be an interface that just returns a map of
> > > > key-value pairs. Providers that return metadata could extend it to
> > > provide
> > > > metadata in a meaningful format instead of Map.
> > >
> > > I've replaced ConfigData with Map as you suggested.
> > >
> > >
> > > > 3. For ZK, we would use ConfigProvider.get() without `keys` to get
> all
> > > > keys in the path. Do we have two get() methods since some providers
> > need
> > > > keys to be specified and some don't? How do we decide which one to
> use?
> > >
> > > The ConfigProvider should be thought of like a Map interface and does
> not
> > > require that one signature of get() be preferred over the other.
> KIP-226
> > > can use get(String path) while Connect will use get(String path,
> > > Set) since it knows which keys it is interested in.
> > >
> > >
> > > A few more updates to the KIP:
> > >
> > > - I've elided the ConfigTransformer implementation as Colin suggested.
> > > - The variable reference now looks like ${provider:[path:]key} where
> the
> > > path is optional.
> > >
> > >
> > > Thanks!
> > > Robert
> > >
> > >
> > >
> > >
> > > On Wed, May 16, 2018 at 4:30 AM, Rajini Sivaram <
> rajinisiva...@gmail.com
> > >
> > > wrote:
> > >
> > > > Hi Robert,
> > > >
> > > > Thanks for the KIP updates.
> > > >
> > > > The interfaces look suitable for brokers, with some small changes. If
> > we
> > > > can adapt the interface to implement the existing
> DynamicBrokerConfig,
> > > then
> > > > we are good.
> > > >
> > > > With broker configs:
> > > >
> > > >1. We don't know what configs are in ZK since we allow custom
> > configs.
> > > >So we would use `ConfigProvider.get()` without specifying keys.
> > > >2. We want to see all changes (i.e. changes under a path). We can
> > deal
> > > >with this internally by ignoring `keys` and subscribing to
> > everything
> > > >3. We have two paths (one for per-broker config and another for
> > > default
> > > >config shared by all brokers). All methods should ideally provide
> > > path -
> > > >see changes suggested below.
> > > >4. Keys are not independent. We update in batches (e.g keystore +
> > > >password). We want to see batches of changes, not 

Build failed in Jenkins: kafka-2.0-jdk8 #10

2018-06-11 Thread Apache Jenkins Server
See 


Changes:

[lindong28] KAFKA-6946; Keep the session id for incremental fetch when fetch

--
[...truncated 481.75 KB...]

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.network.SocketServerTest > testGracefulClose STARTED

kafka.network.SocketServerTest > testGracefulClose PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > controlThrowable STARTED

kafka.network.SocketServerTest > controlThrowable PASSED

kafka.network.SocketServerTest > testRequestMetricsAfterStop STARTED

kafka.network.SocketServerTest > testRequestMetricsAfterStop PASSED

kafka.network.SocketServerTest > testConnectionIdReuse STARTED

kafka.network.SocketServerTest > testConnectionIdReuse PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testProcessorMetricsTags STARTED

kafka.network.SocketServerTest > testProcessorMetricsTags PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testConnectionId STARTED

kafka.network.SocketServerTest > testConnectionId PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > testNoOpAction STARTED

kafka.network.SocketServerTest > testNoOpAction PASSED

kafka.network.SocketServerTest > simpleRequest STARTED

kafka.network.SocketServerTest > simpleRequest PASSED

kafka.network.SocketServerTest > closingChannelException STARTED

kafka.network.SocketServerTest > closingChannelException PASSED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingInProgress STARTED

kafka.network.SocketServerTest > 
testSendActionResponseWithThrottledChannelWhereThrottlingInProgress PASSED

kafka.network.SocketServerTest > testIdleConnection STARTED

kafka.network.SocketServerTest > testIdleConnection PASSED

kafka.network.SocketServerTest > 
testClientDisconnectionWithStagedReceivesFullyProcessed STARTED

kafka.network.SocketServerTest > 
testClientDisconnectionWithStagedReceivesFullyProcessed PASSED

kafka.network.SocketServerTest > testZeroMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testZeroMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > testMetricCollectionAfterShutdown STARTED

kafka.network.SocketServerTest > testMetricCollectionAfterShutdown PASSED

kafka.network.SocketServerTest > testSessionPrincipal STARTED

kafka.network.SocketServerTest > testSessionPrincipal PASSED

kafka.network.SocketServerTest > configureNewConnectionException STARTED

kafka.network.SocketServerTest > configureNewConnectionException PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides PASSED

kafka.network.SocketServerTest > processNewResponseException STARTED

kafka.network.SocketServerTest > processNewResponseException PASSED

kafka.network.SocketServerTest > processCompletedSendException STARTED

kafka.network.SocketServerTest > processCompletedSendException PASSED

kafka.network.SocketServerTest > processDisconnectedException STARTED

kafka.network.SocketServerTest > processDisconnectedException PASSED

kafka.network.SocketServerTest > sendCancelledKeyException STARTED

kafka.network.SocketServerTest > sendCancelledKeyException PASSED

kafka.network.SocketServerTest > processCompletedReceiveException STARTED

kafka.network.SocketServerTest > processCompletedReceiveException PASSED

kafka.network.SocketServerTest > testSocketsCloseOnShutdown STARTED

kafka.network.SocketServerTest > testSocketsCloseOnShutdown PASSED

kafka.network.SocketServerTest > 
testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone STARTED

kafka.network.SocketServerTest > 
testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone PASSED

kafka.network.SocketServerTest > pollException STARTED

kafka.network.SocketServerTest > pollException PASSED

kafka.network.SocketServerTest > testSslSocketServer STARTED

kafka.network.SocketServerTest > testSslSocketServer PASSED

kafka.network.SocketServerTest > tooBigRequestIsRejected STARTED

kafka.network.SocketServerTest > tooBigRequestIsRejected PASSED

kafka.network.SocketServerTest > 
testNoOpActionResponseWithThrottledChannelWhereThrottlingInProgress STARTED

kafka.network.SocketServerTest > 

[jira] [Created] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned

2018-06-11 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-7039:
--

 Summary: DelegatingClassLoader creates plugin instance even if its 
not Versioned
 Key: KAFKA-7039
 URL: https://issues.apache.org/jira/browse/KAFKA-7039
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.0.0
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar
 Fix For: 2.0.0


The versioned interface was introduced as part of 
[KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin].
 DelegatingClassLoader is now attempting to create an instance of all the 
plugins, even if it's not required.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-297: Externalizing Secrets for Connect Configurations

2018-06-11 Thread Konstantine Karantasis
Hi everyone, after fixing an issue with a regular expression in Connect's
class loading isolation of the new component type ConfigProvider here:

https://github.com/apache/kafka/pull/5177

I noticed that the new interface ConfigProvider, along with its first
implementation FileConfigProvider, have been placed in the package:

org.apache.kafka.common.config

This specific package is mentioned in KIP-297 is a few places, but not in
any code snippets. I'd like to suggest moving the interface and any current
of future implementation classes in a new package named:

org.apache.kafka.common.config.provider

and update the KIP document accordingly.

This seems to make sense in general. But, specifically, in Connect it is
desired since we treat ConfigProvider implementations as Connect components
that are loaded in isolation. Having a package for config providers will
allow us to avoid making any assumptions with respect to the name of a
class that implements `ConfigProvider` and is included in Apache Kafka. It
will suffice for this class to reside in the package
org.apache.kafka.common.config.provider.

Let me know if this is a reasonable request and if you agree on amending
the KIP description.

- Konstantine



On Wed, May 16, 2018 at 10:33 AM, Rajini Sivaram 
wrote:

> Thanks for the update, Robert. Looks good to me.
>
> Regards,
>
> Rajini
>
> On Wed, May 16, 2018 at 4:43 PM, Robert Yokota  wrote:
>
> > Hi Rajini,
> >
> > Thanks for the excellent feedback!
> >
> > I've made the API changes that you've requested in the KIP.
> >
> >
> > > 1. Are we expecting one provider instance with different contexts
> > > provided to `ConfigProvider.get()`? If we created a different provider
> > > instance for each context, we could deal with scheduling reloads in the
> > > provider implementation?
> >
> > Yes, there would be one provider instance.  I've collapsed the
> > ConfigContext and the ConfigChangeCallback by adding a parameter delayMs
> to
> > indicate when the change will happen.  When a particular ConfigProvider
> > retrieves a lease duration along with a key, it can either 1)  schedule a
> > background thread to push out the change when it happens (at which time
> the
> > delayMs will be 0), or invoke the callback immediately with the lease
> > duration set as delayMs (of course, in this case the values for the keys
> > will be the old values).  A ConfProvider could be parameterized to do one
> > or the other.
> >
> >
> > > 2. Couldn't ConfigData  be an interface that just returns a map of
> > > key-value pairs. Providers that return metadata could extend it to
> > provide
> > > metadata in a meaningful format instead of Map.
> >
> > I've replaced ConfigData with Map as you suggested.
> >
> >
> > > 3. For ZK, we would use ConfigProvider.get() without `keys` to get all
> > > keys in the path. Do we have two get() methods since some providers
> need
> > > keys to be specified and some don't? How do we decide which one to use?
> >
> > The ConfigProvider should be thought of like a Map interface and does not
> > require that one signature of get() be preferred over the other.  KIP-226
> > can use get(String path) while Connect will use get(String path,
> > Set) since it knows which keys it is interested in.
> >
> >
> > A few more updates to the KIP:
> >
> > - I've elided the ConfigTransformer implementation as Colin suggested.
> > - The variable reference now looks like ${provider:[path:]key} where the
> > path is optional.
> >
> >
> > Thanks!
> > Robert
> >
> >
> >
> >
> > On Wed, May 16, 2018 at 4:30 AM, Rajini Sivaram  >
> > wrote:
> >
> > > Hi Robert,
> > >
> > > Thanks for the KIP updates.
> > >
> > > The interfaces look suitable for brokers, with some small changes. If
> we
> > > can adapt the interface to implement the existing DynamicBrokerConfig,
> > then
> > > we are good.
> > >
> > > With broker configs:
> > >
> > >1. We don't know what configs are in ZK since we allow custom
> configs.
> > >So we would use `ConfigProvider.get()` without specifying keys.
> > >2. We want to see all changes (i.e. changes under a path). We can
> deal
> > >with this internally by ignoring `keys` and subscribing to
> everything
> > >3. We have two paths (one for per-broker config and another for
> > default
> > >config shared by all brokers). All methods should ideally provide
> > path -
> > >see changes suggested below.
> > >4. Keys are not independent. We update in batches (e.g keystore +
> > >password). We want to see batches of changes, not individual
> changes.
> > We
> > >retrieve all values from a path when a change is detected. We can do
> > > this
> > >by ignoring values from the callback, but it would be better if the
> > >callback interface could be changed - see below.
> > >
> > >
> > > public interface ConfigProvider extends Configurable, Closeable {
> > >
> > > *//** KIP-226 will use this*
> > > ConfigData get(ConfigContext ctx, String path);
> > >
> > >

[jira] [Resolved] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-06-11 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6946.
-
Resolution: Fixed

> Keep the session id for incremental fetch when fetch responses are throttled 
> -
>
> Key: KAFKA-6946
> URL: https://issues.apache.org/jira/browse/KAFKA-6946
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Priority: Major
>
> The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
> INVALID_SESSION_ID if the response needs to be throttled due to quota 
> violation. If it is for incremental fetch, this will make the client reset 
> its session and send a full fetch request next time. This is not a 
> correctness issue, but it may affect performance when fetches are throttled.
> In case of incremental fetch, a throttled response should use the same 
> session id as before so that the next unthrottled response can be in the same 
> session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-312: Add Overloaded StreamsBuilder Build Method to Accept java.util.Properties

2018-06-11 Thread Matthias J. Sax
@John: I don't think this is a good idea. `KafkaStreams` executes a
`Topology` and should be agnostic if the topology was build manually or
via `StreamsBuilder` (at least from my point of view).

-Matthias

On 6/11/18 9:53 AM, Guozhang Wang wrote:
> Another implementation detail that we can consider: currently the
> InternalTopologyBuilder#setApplicationId() is used because we do not have
> such a mechanism to pass in configs to the topology building process. Once
> we add such mechanism we should consider removing this function as well.
> 
> 
> Guozhang
> 
> On Mon, Jun 11, 2018 at 9:51 AM, Guozhang Wang  wrote:
> 
>> Hello Bill,
>>
>> While working on https://github.com/apache/kafka/pull/5163 I am wondering
>> if we can hide this from the public API, to e.g. add an additional function
>> in InternalTopologyBuilder of InternalStreamsBuilder (since in your current
>> working PR we're reusing InternalStreamsBuilder for the logical plan
>> generation) which can then be called inside KafkaStreams constructors?
>>
>>
>> Guozhang
>>
>>
>> On Mon, Jun 11, 2018 at 9:41 AM, John Roesler  wrote:
>>
>>> Hi Bill,
>>>
>>> Thanks for the KIP.
>>>
>>> Just a small thought. This new API will result in calls that look like
>>> this:
>>> new KafkaStreams(builder.build(props), props);
>>>
>>> Do you think that's a significant enough eyesore to warrant adding a new
>>> KafkaStreams constructor taking a KStreamsBuilder like this:
>>> new KafkaStreams(builder, props);
>>>
>>> such that it would internally call builder.build(props) ?
>>>
>>> Thanks,
>>> -John
>>>
>>>
>>>
>>> On Fri, Jun 8, 2018 at 7:16 PM, Ted Yu  wrote:
>>>
 Since there're only two values for the optional optimization config
 introduced by KAFKA-6935, I wonder the overloaded build method (with
 Properties
 instance) would make the config unnecessary.

 nit:
 * @return @return the {@link Topology} that represents the specified
 processing logic

 Double @return above.

 Cheers

 On Fri, Jun 8, 2018 at 3:20 PM, Bill Bejeck  wrote:

> All,
>
> I'd like to start the discussion for adding an overloaded method to
> StreamsBuilder taking a java.util.Properties instance.
>
> The KIP is located here :
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 312%3A+Add+Overloaded+StreamsBuilder+Build+Method+
> to+Accept+java.util.Properties
>
> I look forward to your comments.
>
> Thanks,
> Bill
>

>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-312: Add Overloaded StreamsBuilder Build Method to Accept java.util.Properties

2018-06-11 Thread Guozhang Wang
Another implementation detail that we can consider: currently the
InternalTopologyBuilder#setApplicationId() is used because we do not have
such a mechanism to pass in configs to the topology building process. Once
we add such mechanism we should consider removing this function as well.


Guozhang

On Mon, Jun 11, 2018 at 9:51 AM, Guozhang Wang  wrote:

> Hello Bill,
>
> While working on https://github.com/apache/kafka/pull/5163 I am wondering
> if we can hide this from the public API, to e.g. add an additional function
> in InternalTopologyBuilder of InternalStreamsBuilder (since in your current
> working PR we're reusing InternalStreamsBuilder for the logical plan
> generation) which can then be called inside KafkaStreams constructors?
>
>
> Guozhang
>
>
> On Mon, Jun 11, 2018 at 9:41 AM, John Roesler  wrote:
>
>> Hi Bill,
>>
>> Thanks for the KIP.
>>
>> Just a small thought. This new API will result in calls that look like
>> this:
>> new KafkaStreams(builder.build(props), props);
>>
>> Do you think that's a significant enough eyesore to warrant adding a new
>> KafkaStreams constructor taking a KStreamsBuilder like this:
>> new KafkaStreams(builder, props);
>>
>> such that it would internally call builder.build(props) ?
>>
>> Thanks,
>> -John
>>
>>
>>
>> On Fri, Jun 8, 2018 at 7:16 PM, Ted Yu  wrote:
>>
>> > Since there're only two values for the optional optimization config
>> > introduced by KAFKA-6935, I wonder the overloaded build method (with
>> > Properties
>> > instance) would make the config unnecessary.
>> >
>> > nit:
>> > * @return @return the {@link Topology} that represents the specified
>> > processing logic
>> >
>> > Double @return above.
>> >
>> > Cheers
>> >
>> > On Fri, Jun 8, 2018 at 3:20 PM, Bill Bejeck  wrote:
>> >
>> > > All,
>> > >
>> > > I'd like to start the discussion for adding an overloaded method to
>> > > StreamsBuilder taking a java.util.Properties instance.
>> > >
>> > > The KIP is located here :
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > 312%3A+Add+Overloaded+StreamsBuilder+Build+Method+
>> > > to+Accept+java.util.Properties
>> > >
>> > > I look forward to your comments.
>> > >
>> > > Thanks,
>> > > Bill
>> > >
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-312: Add Overloaded StreamsBuilder Build Method to Accept java.util.Properties

2018-06-11 Thread Guozhang Wang
Hello Bill,

While working on https://github.com/apache/kafka/pull/5163 I am wondering
if we can hide this from the public API, to e.g. add an additional function
in InternalTopologyBuilder of InternalStreamsBuilder (since in your current
working PR we're reusing InternalStreamsBuilder for the logical plan
generation) which can then be called inside KafkaStreams constructors?


Guozhang


On Mon, Jun 11, 2018 at 9:41 AM, John Roesler  wrote:

> Hi Bill,
>
> Thanks for the KIP.
>
> Just a small thought. This new API will result in calls that look like
> this:
> new KafkaStreams(builder.build(props), props);
>
> Do you think that's a significant enough eyesore to warrant adding a new
> KafkaStreams constructor taking a KStreamsBuilder like this:
> new KafkaStreams(builder, props);
>
> such that it would internally call builder.build(props) ?
>
> Thanks,
> -John
>
>
>
> On Fri, Jun 8, 2018 at 7:16 PM, Ted Yu  wrote:
>
> > Since there're only two values for the optional optimization config
> > introduced by KAFKA-6935, I wonder the overloaded build method (with
> > Properties
> > instance) would make the config unnecessary.
> >
> > nit:
> > * @return @return the {@link Topology} that represents the specified
> > processing logic
> >
> > Double @return above.
> >
> > Cheers
> >
> > On Fri, Jun 8, 2018 at 3:20 PM, Bill Bejeck  wrote:
> >
> > > All,
> > >
> > > I'd like to start the discussion for adding an overloaded method to
> > > StreamsBuilder taking a java.util.Properties instance.
> > >
> > > The KIP is located here :
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 312%3A+Add+Overloaded+StreamsBuilder+Build+Method+
> > > to+Accept+java.util.Properties
> > >
> > > I look forward to your comments.
> > >
> > > Thanks,
> > > Bill
> > >
> >
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-312: Add Overloaded StreamsBuilder Build Method to Accept java.util.Properties

2018-06-11 Thread John Roesler
Hi Bill,

Thanks for the KIP.

Just a small thought. This new API will result in calls that look like this:
new KafkaStreams(builder.build(props), props);

Do you think that's a significant enough eyesore to warrant adding a new
KafkaStreams constructor taking a KStreamsBuilder like this:
new KafkaStreams(builder, props);

such that it would internally call builder.build(props) ?

Thanks,
-John



On Fri, Jun 8, 2018 at 7:16 PM, Ted Yu  wrote:

> Since there're only two values for the optional optimization config
> introduced by KAFKA-6935, I wonder the overloaded build method (with
> Properties
> instance) would make the config unnecessary.
>
> nit:
> * @return @return the {@link Topology} that represents the specified
> processing logic
>
> Double @return above.
>
> Cheers
>
> On Fri, Jun 8, 2018 at 3:20 PM, Bill Bejeck  wrote:
>
> > All,
> >
> > I'd like to start the discussion for adding an overloaded method to
> > StreamsBuilder taking a java.util.Properties instance.
> >
> > The KIP is located here :
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 312%3A+Add+Overloaded+StreamsBuilder+Build+Method+
> > to+Accept+java.util.Properties
> >
> > I look forward to your comments.
> >
> > Thanks,
> > Bill
> >
>


Re: [DISCUSS] KIP-313: Add KStream.flatTransform and KStream.flatTransformValues

2018-06-11 Thread Bill Bejeck
Thanks for the KIP, it's a +1 for me.  Looking forward to having a look at
the PR.



On Mon, Jun 11, 2018 at 9:58 AM Damian Guy  wrote:

> Thanks for the KIP. LGTM
>
> On Mon, 11 Jun 2018 at 00:44 Matthias J. Sax 
> wrote:
>
> > Thanks for the KIP.
> >
> > I don't have any comments at this pint. Overall I am +1 on the KIP.
> >
> >
> > -Matthias
> >
> > On 6/8/18 2:56 PM, Bruno Cadonna wrote:
> > > Hi list,
> > >
> > > I created KIP-313 [1] for JIRA issue KAFKA-4217 [2] and I would like to
> > > put the KIP up for discussion.
> > >
> > > Best regards,
> > > Bruno
> > >
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues
> > >
> > > [2] https://issues.apache.org/jira/browse/KAFKA-4217
> > >
> >
> >
>


[jira] [Created] (KAFKA-7038) Support AdminClient Example

2018-06-11 Thread darion yaphet (JIRA)
darion yaphet created KAFKA-7038:


 Summary: Support AdminClient Example
 Key: KAFKA-7038
 URL: https://issues.apache.org/jira/browse/KAFKA-7038
 Project: Kafka
  Issue Type: New Feature
  Components: admin
Reporter: darion yaphet






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Apache Kafka 2.0.0 Release Progress

2018-06-11 Thread Rajini Sivaram
Hi all,

I have moved out most of the JIRAs that aren't currently being worked on.
We still have 24 JIRAs with 2.0.0 as the fix version (
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80448820).
Six of these are marked as blockers. Since KIP freeze was delayed by a day
due to holidays, code freeze will be on 13th of June. I will be aiming to
create the first RC for 2.0.0 on 14th of June UK morning time. Please help
to close out the remaining JIRAs before that.

Thank you!

Regards,

Rajini


Re: [DISCUSS] KIP-110: Add Codec for ZStandard Compression (Updated)

2018-06-11 Thread Dongjin Lee
I greatly appreciate your comprehensive reasoning. so: +1 for b until now.

For the license issues, I will have a check on how the over projects are
doing and share the results.

Best,
Dongjin

On Mon, Jun 11, 2018 at 10:08 PM Viktor Somogyi 
wrote:

> Hi Dongjin,
>
> A couple of comments:
> I would vote for option b. in the "backward compatibility" section. My
> reasoning for this is that users upgrading to a zstd compatible version
> won't start to use it automatically, so manual reconfiguration is required.
> Therefore an upgrade won't mess up the cluster. If not all the clients are
> upgraded but just some of them and they'd start to use zstd then it would
> cause errors in the cluster. I'd like to presume though that this is a very
> obvious failure case and nobody should be surprised if it didn't work.
> I wouldn't choose a. as I think we should bump the fetch and produce
> requests if it's a change in the message format. Moreover if some of the
> producers and the brokers are upgraded but some of the consumers are not,
> then we wouldn't prevent the error when the old consumer tries to consume
> the zstd compressed messages.
> I wouldn't choose c. either as I think binding the compression type to an
> API is not so obvious from the developer's perspective.
>
> I would also prefer to use the existing binding, however we must respect
> the licenses:
> "The code for these JNI bindings is licenced under 2-clause BSD license.
> The native Zstd library is licensed under 3-clause BSD license and GPL2"
> Based on the FAQ page
> https://www.apache.org/legal/resolved.html#category-a
> we may use 2- and 3-clause BSD licenses but the Apache license is not
> compatible with GPL2. I'm hoping that the "3-clause BSD license and GPL2"
> is really not an AND but an OR in this case, but I'm no lawyer, just wanted
> to make the point that we should watch out for licenses. :)
>
> Regards,
> Viktor
>
>
> On Sun, Jun 10, 2018 at 3:02 AM Ivan Babrou  wrote:
>
> > Hello,
> >
> > This is Ivan and I still very much support the fact that zstd compression
> > should be included out of the box.
> >
> > Please think about the environment, you can save quite a lot of hardware
> > with it.
> >
> > Thank you.
> >
> > On Sat, Jun 9, 2018 at 14:14 Dongjin Lee  wrote:
> >
> > > Since there are no responses for a week, I decided to reinitiate the
> > > discussion thread.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression
> > >
> > > This KIP is about to introduce ZStandard Compression into Apache Kafka.
> > > The reason why it is posted again has a story: It was originally posted
> > to
> > > the dev mailing list more than one year ago but since it has no
> > performance
> > > report included, it was postponed later. But Some people (including
> Ivan)
> > > reported excellent performance report with the draft PR, this work is
> now
> > > reactivated.
> > >
> > > The updated KIP document includes some expected problems and their
> > > candidate alternatives. Please have a look when you are free, and give
> > me a
> > > feedback. All kinds of participating are welcome.
> > >
> > > Best,
> > > Dongjin
> > >
> > > --
> > > *Dongjin Lee*
> > >
> > > *A hitchhiker in the mathematical world.*
> > >
> > > *github:  github.com/dongjinleekr
> > > linkedin:
> > kr.linkedin.com/in/dongjinleekr
> > > slideshare:
> > www.slideshare.net/dongjinleekr
> > > *
> > >
> >
>
-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*

*github:  github.com/dongjinleekr
linkedin: kr.linkedin.com/in/dongjinleekr
slideshare:
www.slideshare.net/dongjinleekr
*


Re: [DISCUSS] KIP-313: Add KStream.flatTransform and KStream.flatTransformValues

2018-06-11 Thread Damian Guy
Thanks for the KIP. LGTM

On Mon, 11 Jun 2018 at 00:44 Matthias J. Sax  wrote:

> Thanks for the KIP.
>
> I don't have any comments at this pint. Overall I am +1 on the KIP.
>
>
> -Matthias
>
> On 6/8/18 2:56 PM, Bruno Cadonna wrote:
> > Hi list,
> >
> > I created KIP-313 [1] for JIRA issue KAFKA-4217 [2] and I would like to
> > put the KIP up for discussion.
> >
> > Best regards,
> > Bruno
> >
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues
> >
> > [2] https://issues.apache.org/jira/browse/KAFKA-4217
> >
>
>


Re: [DISCUSS] KIP-110: Add Codec for ZStandard Compression (Updated)

2018-06-11 Thread Viktor Somogyi
Hi Dongjin,

A couple of comments:
I would vote for option b. in the "backward compatibility" section. My
reasoning for this is that users upgrading to a zstd compatible version
won't start to use it automatically, so manual reconfiguration is required.
Therefore an upgrade won't mess up the cluster. If not all the clients are
upgraded but just some of them and they'd start to use zstd then it would
cause errors in the cluster. I'd like to presume though that this is a very
obvious failure case and nobody should be surprised if it didn't work.
I wouldn't choose a. as I think we should bump the fetch and produce
requests if it's a change in the message format. Moreover if some of the
producers and the brokers are upgraded but some of the consumers are not,
then we wouldn't prevent the error when the old consumer tries to consume
the zstd compressed messages.
I wouldn't choose c. either as I think binding the compression type to an
API is not so obvious from the developer's perspective.

I would also prefer to use the existing binding, however we must respect
the licenses:
"The code for these JNI bindings is licenced under 2-clause BSD license.
The native Zstd library is licensed under 3-clause BSD license and GPL2"
Based on the FAQ page https://www.apache.org/legal/resolved.html#category-a
we may use 2- and 3-clause BSD licenses but the Apache license is not
compatible with GPL2. I'm hoping that the "3-clause BSD license and GPL2"
is really not an AND but an OR in this case, but I'm no lawyer, just wanted
to make the point that we should watch out for licenses. :)

Regards,
Viktor


On Sun, Jun 10, 2018 at 3:02 AM Ivan Babrou  wrote:

> Hello,
>
> This is Ivan and I still very much support the fact that zstd compression
> should be included out of the box.
>
> Please think about the environment, you can save quite a lot of hardware
> with it.
>
> Thank you.
>
> On Sat, Jun 9, 2018 at 14:14 Dongjin Lee  wrote:
>
> > Since there are no responses for a week, I decided to reinitiate the
> > discussion thread.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression
> >
> > This KIP is about to introduce ZStandard Compression into Apache Kafka.
> > The reason why it is posted again has a story: It was originally posted
> to
> > the dev mailing list more than one year ago but since it has no
> performance
> > report included, it was postponed later. But Some people (including Ivan)
> > reported excellent performance report with the draft PR, this work is now
> > reactivated.
> >
> > The updated KIP document includes some expected problems and their
> > candidate alternatives. Please have a look when you are free, and give
> me a
> > feedback. All kinds of participating are welcome.
> >
> > Best,
> > Dongjin
> >
> > --
> > *Dongjin Lee*
> >
> > *A hitchhiker in the mathematical world.*
> >
> > *github:  github.com/dongjinleekr
> > linkedin:
> kr.linkedin.com/in/dongjinleekr
> > slideshare:
> www.slideshare.net/dongjinleekr
> > *
> >
>


[jira] [Created] (KAFKA-7037) Kafka doesn't allow to delete topic with '+' in the name

2018-06-11 Thread Sandeep Nemuri (JIRA)
Sandeep Nemuri created KAFKA-7037:
-

 Summary: Kafka doesn't allow to delete topic with '+' in the name
 Key: KAFKA-7037
 URL: https://issues.apache.org/jira/browse/KAFKA-7037
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.1.0
Reporter: Sandeep Nemuri


 
{code:java}
[kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
test+topic
Created topic "test+topic".
[kafka@localhost ~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --list
__consumer_offsets
test+topic

[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --delete --topic test+topic
Error while executing topic command : Topic test+topic does not exist on ZK 
path ssltester-3.openstacklocal:2181
[2018-06-11 09:36:32,989] ERROR java.lang.IllegalArgumentException: Topic 
test+topic does not exist on ZK path ssltester-3.openstacklocal:2181
 at kafka.admin.TopicCommand$.deleteTopic(TopicCommand.scala:166)
 at kafka.admin.TopicCommand$.main(TopicCommand.scala:68)
 at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)
{code}
The major issue is that while executing a delete command kafka cli tool is 
removing the "+" symbol and deleting the incorrect topic. In below case if 
_"*test+topic"*_ is deleted kafka deletes _*testtopic.*_
{code:java}
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create 
--zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic 
testtopic
Created topic "testtopic".
[kafka@localhost~]$ /usr/hdp/current/kafka-broker/bin/kafka-topics.sh 
--zookeeper `hostname`:2181 --topic test+topic --delete
Topic testtopic is marked for deletion.{code}
 

It seems create topic doesn't check for '+' and delete topic replaces '+' from 
the topic name.

Create Topic: 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L77-L85]
 

Delete Topic : 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/TopicFilter.scala#L28-L33]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7036) Complete the docs of KafkaConsumer#poll

2018-06-11 Thread Chia-Ping Tsai (JIRA)
Chia-Ping Tsai created KAFKA-7036:
-

 Summary: Complete the docs of KafkaConsumer#poll
 Key: KAFKA-7036
 URL: https://issues.apache.org/jira/browse/KAFKA-7036
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


KafkaConsumer#poll has a nice docs about the expected exceptions. However, it 
lacks the description of SerializationException. Another mirror issue is that 
KafkaConsumer doesn't catch all type of exception which may be thrown by 
deserializer (see below). We should use Throwable to replace the 
RuntimeException so as to catch all exception and then wrap them to 
SerializationException.
{code:java}
private ConsumerRecord parseRecord(TopicPartition partition,
 RecordBatch batch,
 Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
TimestampType timestampType = batch.timestampType();
Headers headers = new RecordHeaders(record.headers());
ByteBuffer keyBytes = record.key();
byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
K key = keyBytes == null ? null : 
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
ByteBuffer valueBytes = record.value();
byte[] valueByteArray = valueBytes == null ? null : 
Utils.toArray(valueBytes);
V value = valueBytes == null ? null : 
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
return new ConsumerRecord<>(partition.topic(), partition.partition(), 
offset,
timestamp, timestampType, 
record.checksumOrNull(),
keyByteArray == null ? 
ConsumerRecord.NULL_SIZE : keyByteArray.length,
valueByteArray == null ? 
ConsumerRecord.NULL_SIZE : valueByteArray.length,
key, value, headers);
} catch (RuntimeException e) {
throw new SerializationException("Error deserializing key/value for 
partition " + partition +
" at offset " + record.offset() + ". If needed, please seek 
past the record to continue consumption.", e);
}
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7035) Kafka Processor's init() method sometimes is not called

2018-06-11 Thread Oleksandr Konopko (JIRA)
Oleksandr Konopko created KAFKA-7035:


 Summary: Kafka Processor's init() method sometimes is not called
 Key: KAFKA-7035
 URL: https://issues.apache.org/jira/browse/KAFKA-7035
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Oleksandr Konopko


Scenario:

1. We have processing of Kafka Topic which is implemented with Processor API

2. We want to collect metrics (lets say just count number of processed entities 
for simplicity)

3. How we tried to organize this
 * process data with process() method and send it down the stream with context
 * on each call of process() method update the counter
 * schedule puctuate function which will send metric to special topic. Metric 
is build with counter

You can find the code (we removed all business sensitive code out of it, so it 
should be easy to read) in attachment

 

Problematic Kafka Streams behaviour that i can see by logging every step:

1. We have 80 messages in the input topic

2. Kafka Streams creates 4 Processor instances. Lets name them: ProcessorA, 
ProcessorB, ProcessorC and ProcessorD

3. ProcessorA and ProcessorB receive 1-5% of data. Data is processed correctly, 
results are sent down the stream. Counter is upated

4. init() method was not called for ProcessorA and ProcessorB

5. ProcessorC and ProcessorD are created and they start to receive all the rest 
of data. 95-99%

6. init() method is called for both ProcessorC and ProcessorD. It initiates 
punctuation, which causes Metrics message be created and sent down the metric 
stream periodically

7. ProcessorA and ProcessorB are closed. init() was never called for them. So 
Metric entity was not sent to metrics topic

8. Processing is finished.

 

In the end:

Expected:
 * 80 entities were processed and sent to the Sink
 * Metrics entities contain counters which sum up to 80

Actual results:
 * 80 entities were processed and sent to the Sink
 * Metrics entities contain counters which sum up to some number 3-6% less than 
80, for example 786543

 

Problem:
 * init() method call is not guaranteed
 * there is no way to guarantee that all work was done by punctuate method 
before close()

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7034) Remove the duplicated listTopics from Consumer

2018-06-11 Thread darion yaphet (JIRA)
darion yaphet created KAFKA-7034:


 Summary: Remove the duplicated listTopics from Consumer 
 Key: KAFKA-7034
 URL: https://issues.apache.org/jira/browse/KAFKA-7034
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: darion yaphet


Both AdminClient and Consumer are include the listTopics method , and they are 
also use the Cluster instance to get the topic name . They are very similar .

So I think we should remove the Consumer's listTopics method .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7033) Modify AbstractOptions's timeoutMs as Long type

2018-06-11 Thread darion yaphet (JIRA)
darion yaphet created KAFKA-7033:


 Summary: Modify AbstractOptions's timeoutMs as Long type
 Key: KAFKA-7033
 URL: https://issues.apache.org/jira/browse/KAFKA-7033
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 1.1.0
Reporter: darion yaphet


Currently AbstractOptions's timeoutMs is Integer and using Long  to represent 
timeout Millisecond maybe better .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Support Kafka Broker Status

2018-06-11 Thread ????????????
Hi team:
I think we should support a tool to display kafka cluster broker's status . 
Maybe I should create a KIP (Kafka Improvement Proposals) ? or create JIRA 
directly ?? 
thanks ~

[jira] [Created] (KAFKA-7032) The TimeUnit is neglected by KakfaConsumer#close(long, TimeUnit)

2018-06-11 Thread Chia-Ping Tsai (JIRA)
Chia-Ping Tsai created KAFKA-7032:
-

 Summary: The TimeUnit is neglected by KakfaConsumer#close(long, 
TimeUnit)
 Key: KAFKA-7032
 URL: https://issues.apache.org/jira/browse/KAFKA-7032
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.0.0
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai
 Fix For: 2.0.0


{code:java}
@Deprecated
@Override
public void close(long timeout, TimeUnit timeUnit) {
close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout)));
}{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-206: Add support for UUID serialization and deserialization

2018-06-11 Thread Jakub Scholz
Hi Matthias,

There was some guy who took this KIP over - he had an opened PR and
everything so I let him do it. But from the PR it seems that he now
abandoned it (https://github.com/apache/kafka/pull/4438) - he didn't
responded in a while. I will have a look at it so that it can be fixed and
merged and we can close this KIP.

Shame I didn't noticed it earlier, it is now too late for 2.0.0 :-(.

Thanks & Regards
Jakub


On Mon, Jun 11, 2018 at 2:33 AM Matthias J. Sax 
wrote:

> Jakub,
>
> it seems that you got enough votes.
>
> I am closing the KIP as accepted with
>
>  - 3 binding votes (Ewen, Rajini, Damian)
>  - 4 non-binding (Ted, Colin, Manikumar, Mickael
>  - one non-binding -1 (Jan)
>
>
> Feel free to update the PR -- it might need a rebase.
>
>
>
> -Matthias
>
>
> On 5/11/18 3:31 PM, Damian Guy wrote:
> > +1 (binding)
> >
> > On Fri, 4 May 2018 at 08:55 Rajini Sivaram 
> wrote:
> >
> >> Hi Brandon,
> >>
> >> +1 (binding)
> >>
> >> Thanks for the KIP!
> >>
> >> Regards,
> >>
> >> Rajini
> >>
> >> On Fri, May 4, 2018 at 2:14 PM, Brandon Kirchner <
> >> brandon.kirch...@gmail.com
> >>> wrote:
> >>
> >>> I'd like to resurrect this one last time, any chance we can get some
> more
> >>> binding votes and move this forward?
> >>>
> >>> Brandon K.
> >>>
> >>> On Fri, Feb 2, 2018 at 11:35 AM, Colin McCabe 
> >> wrote:
> >>>
>  Hi Brandon,
> 
>  I think people are generally busy working on the upcoming release now.
>  Sorry for the inconvenience.
> 
>  best,
> 
> 
>  On Fri, Feb 2, 2018, at 07:33, Brandon Kirchner wrote:
> > I'd really like this to get 2 more binding votes. If that doesn't
> >>> happen,
> > how / can this still move forward? Not sure what the procedure is...
> >
> > Brandon K.
> >
> > On Tue, Jan 30, 2018 at 9:56 AM, Mickael Maison <
>  mickael.mai...@gmail.com>
> > wrote:
> >
> >> +1 (non binding)
> >> Thanks for the KIP
> >>
> >> On Tue, Jan 30, 2018 at 9:49 AM, Manikumar <
> >>> manikumar.re...@gmail.com>
> >> wrote:
> >>> +1 (non-binding)
> >>>
> >>> On Tue, Jan 30, 2018 at 11:50 AM, Ewen Cheslack-Postava <
> >> e...@confluent.io>
> >>> wrote:
> >>>
>  +1 (binding)
> 
>  On Fri, Jan 26, 2018 at 9:16 AM, Colin McCabe <
> >> cmcc...@apache.org
> 
> >> wrote:
> 
> > +1 (non-binding)
> >
> >
> >
> > On Fri, Jan 26, 2018, at 08:29, Ted Yu wrote:
> >> +1
> >>
> >> On Fri, Jan 26, 2018 at 7:00 AM, Brandon Kirchner <
> >> brandon.kirch...@gmail.com> wrote:
> >>
> >>> Hi all,
> >>>
> >>> I would like to (re)start the voting process for KIP-206:
> >>>
> >>> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>> 206%3A+Add+support+for+UUID+serialization+and+
> >>> deserialization
> >>>  >>> 206%3A+Add+support+for+UUID+serialization+and+
>  deserialization>*
> >>>
> >>> The KIP adds a UUID serializer and deserializer. Possible
> > implementation
> >>> can be seen here --
> >>>
> >>> https://github.com/apache/kafka/pull/4438
> >>>
> >>> Original discussion and voting thread can be seen here --
> >>> http://search-hadoop.com/m/Kafka/uyzND1dlgePJY7l9?subj=+
> >>> DISCUSS+KIP+206+Add+support+for+UUID+serialization+and+
>  deserialization
> >>>
> >>>
> >>> Thanks!
> >>> Brandon K.
> >>>
> >
> 
> >>
> 
> >>>
> >>
> >
>
>