Re: [VOTE] 2.1.0 RC1

2018-11-16 Thread Dong Lin
Hey Eno, Vahid and everyone,

Thanks for reporting the test error!
https://builds.apache.org/job/kafka-2.1-jdk8/ shows the list of recent unit
test runs. 7 out of 10 recent runs have passed all tests. Each of the three
runs shows one unique flaky test failure.

I have opened umbrella JIRA https://issues.apache.org/jira/browse/KAFKA-7645
to track these flake test. There are currently 7 flaky tests reported in
either https://builds.apache.org/job/kafka-2.1-jdk8/  or the voting
threads. Among these 7 flaky tests, 3 tests failed due to issue in the test
logic, 3 tests are related SSL with similar failure in 2.0 branch which has
been running well. So these 6 tests should not be blocking issue for 2.1.0
release.

Regarding the other test failure for
LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
(KAFKA-7647 ). I did due
diligence to understand the failure but could not find any bug. Since this
test failure happens rarely and I could not find any issue by looking at
the stacktrace, I am again inclined not to consider this as blocking issue.
We can discuss more if there is different opinion in the mailing list.

Thanks,
Dong


On Thu, Nov 15, 2018 at 10:28 PM Guozhang Wang  wrote:

> +1 (binding).
>
> I've verified the signature, and ran quickstart / unit test with scala 2.12
> binary.
>
> On my local laptop the unit test did not fail though on Jenkins it seems
> indeed flaky.
>
> Guozhang
>
> On Fri, Nov 9, 2018 at 3:33 PM Dong Lin  wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the second candidate for feature release of Apache Kafka 2.1.0.
> >
> > This is a major version release of Apache Kafka. It includes 28 new KIPs
> > and
> >
> > critical bug fixes. Please see the Kafka 2.1.0 release plan for more
> > details:
> >
> > *
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=91554044*
> > <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=91554044
> > >
> >
> > Here are a few notable highlights:
> >
> > - Java 11 support
> > - Support for Zstandard, which achieves compression comparable to gzip
> with
> > higher compression and especially decompression speeds(KIP-110)
> > - Avoid expiring committed offsets for active consumer group (KIP-211)
> > - Provide Intuitive User Timeouts in The Producer (KIP-91)
> > - Kafka's replication protocol now supports improved fencing of zombies.
> > Previously, under certain rare conditions, if a broker became partitioned
> > from Zookeeper but not the rest of the cluster, then the logs of
> replicated
> > partitions could diverge and cause data loss in the worst case (KIP-320)
> > - Streams API improvements (KIP-319, KIP-321, KIP-330, KIP-353, KIP-356)
> > - Admin script and admin client API improvements to simplify admin
> > operation (KIP-231, KIP-308, KIP-322, KIP-324, KIP-338, KIP-340)
> > - DNS handling improvements (KIP-235, KIP-302)
> >
> > Release notes for the 2.1.0 release:
> > http://home.apache.org/~lindong/kafka-2.1.0-rc0/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Thursday, Nov 15, 12 pm PT ***
> >
> > * Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~lindong/kafka-2.1.0-rc1/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/
> >
> > * Javadoc:
> > http://home.apache.org/~lindong/kafka-2.1.0-rc1/javadoc/
> >
> > * Tag to be voted upon (off 2.1 branch) is the 2.1.0-rc1 tag:
> > https://github.com/apache/kafka/tree/2.1.0-rc1
> >
> > * Documentation:
> > *http://kafka.apache.org/21/documentation.html*
> > 
> >
> > * Protocol:
> > http://kafka.apache.org/21/protocol.html
> >
> > * Successful Jenkins builds for the 2.1 branch:
> > Unit/integration tests: *
> https://builds.apache.org/job/kafka-2.1-jdk8/50/
> > *
> >
> > Please test and verify the release artifacts and submit a vote for this
> RC,
> > or report any issues so we can fix them and get a new RC out ASAP.
> Although
> > this release vote requires PMC votes to pass, testing, votes, and bug
> > reports are valuable and appreciated from everyone.
> >
> > Cheers,
> > Dong
> >
>
>
> --
> -- Guozhang
>


[jira] [Resolved] (KAFKA-7402) Kafka Streams should implement AutoCloseable where appropriate

2018-11-16 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-7402.

   Resolution: Fixed
Fix Version/s: 2.2.0

> Kafka Streams should implement AutoCloseable where appropriate
> --
>
> Key: KAFKA-7402
> URL: https://issues.apache.org/jira/browse/KAFKA-7402
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Yishun Guan
>Priority: Minor
>  Labels: needs-kip, newbie
> Fix For: 2.2.0
>
>
> Various components in Streams have close methods but do not implement 
> AutoCloseable. This means that they can't be used in try-with-resources 
> blocks.
> Remedying that would simplify our tests and make life easier for users as 
> well.
> KafkaStreams itself is a notable example of this, but we can take the 
> opportunity to look for other components that make sense as AutoCloseable as 
> well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #3206

2018-11-16 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] * MINOR: Catching null pointer exception for empty leader URL when

[colin] KAFKA-7402: Implement KIP-376 AutoCloseable additions

--
[...truncated 2.49 MB...]
kafka.utils.LoggingTest > testLogName STARTED

kafka.utils.LoggingTest > testLogName PASSED

kafka.utils.LoggingTest > testLogNameOverride STARTED

kafka.utils.LoggingTest > testLogNameOverride PASSED

kafka.utils.ZkUtilsTest > testGetSequenceIdMethod STARTED

kafka.utils.ZkUtilsTest > testGetSequenceIdMethod PASSED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testAbortedConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testGetAllPartitionsTopicWithoutPartitions STARTED

kafka.utils.ZkUtilsTest > testGetAllPartitionsTopicWithoutPartitions PASSED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath STARTED

kafka.utils.ZkUtilsTest > testSuccessfulConditionalDeletePath PASSED

kafka.utils.ZkUtilsTest > testPersistentSequentialPath STARTED

kafka.utils.ZkUtilsTest > testPersistentSequentialPath PASSED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing STARTED

kafka.utils.ZkUtilsTest > testClusterIdentifierJsonParsing PASSED

kafka.utils.ZkUtilsTest > testGetLeaderIsrAndEpochForPartition STARTED

kafka.utils.ZkUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.CoreUtilsTest > testGenerateUuidAsBase64 STARTED

kafka.utils.CoreUtilsTest > testGenerateUuidAsBase64 PASSED

kafka.utils.CoreUtilsTest > testAbs STARTED

kafka.utils.CoreUtilsTest > testAbs PASSED

kafka.utils.CoreUtilsTest > testReplaceSuffix STARTED

kafka.utils.CoreUtilsTest > testReplaceSuffix PASSED

kafka.utils.CoreUtilsTest > testCircularIterator STARTED

kafka.utils.CoreUtilsTest > testCircularIterator PASSED

kafka.utils.CoreUtilsTest > testReadBytes STARTED

kafka.utils.CoreUtilsTest > testReadBytes PASSED

kafka.utils.CoreUtilsTest > testCsvList STARTED

kafka.utils.CoreUtilsTest > testCsvList PASSED

kafka.utils.CoreUtilsTest > testReadInt STARTED

kafka.utils.CoreUtilsTest > testReadInt PASSED

kafka.utils.CoreUtilsTest > testAtomicGetOrUpdate STARTED

kafka.utils.CoreUtilsTest > testAtomicGetOrUpdate PASSED

kafka.utils.CoreUtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.CoreUtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.CoreUtilsTest > testCsvMap STARTED

kafka.utils.CoreUtilsTest > testCsvMap PASSED

kafka.utils.CoreUtilsTest > testInLock STARTED

kafka.utils.CoreUtilsTest > testInLock PASSED

kafka.utils.CoreUtilsTest > testTryAll STARTED

kafka.utils.CoreUtilsTest > testTryAll PASSED

kafka.utils.CoreUtilsTest > testSwallow STARTED

kafka.utils.CoreUtilsTest > testSwallow PASSED

kafka.utils.json.JsonValueTest > testJsonObjectIterator STARTED

kafka.utils.json.JsonValueTest > testJsonObjectIterator PASSED

kafka.utils.json.JsonValueTest > testDecodeLong STARTED

kafka.utils.json.JsonValueTest > testDecodeLong PASSED

kafka.utils.json.JsonValueTest > testAsJsonObject STARTED

kafka.utils.json.JsonValueTest > testAsJsonObject PASSED

kafka.utils.json.JsonValueTest > testDecodeDouble STARTED

kafka.utils.json.JsonValueTest > testDecodeDouble PASSED

kafka.utils.json.JsonValueTest > testDecodeOption STARTED

kafka.utils.json.JsonValueTest > testDecodeOption PASSED

kafka.utils.json.JsonValueTest > testDecodeString STARTED

kafka.utils.json.JsonValueTest > testDecodeString PASSED

kafka.utils.json.JsonValueTest > testJsonValueToString STARTED

kafka.utils.json.JsonValueTest > testJsonValueToString PASSED

kafka.utils.json.JsonValueTest > testAsJsonObjectOption STARTED

kafka.utils.json.JsonValueTest > testAsJsonObjectOption PASSED

kafka.utils.json.JsonValueTest > testAsJsonArrayOption STARTED

kafka.utils.json.JsonValueTest > testAsJsonArrayOption PASSED

kafka.utils.json.JsonValueTest > testAsJsonArray STARTED

kafka.utils.json.JsonValueTest > testAsJsonArray PASSED

kafka.utils.json.JsonValueTest > testJsonValueHashCode STARTED

kafka.utils.json.JsonValueTest > testJsonValueHashCode PASSED

kafka.utils.json.JsonValueTest > testDecodeInt STARTED

kafka.utils.json.JsonValueTest > testDecodeInt PASSED

kafka.utils.json.JsonValueTest > testDecodeMap STARTED

kafka.utils.json.JsonValueTest > testDecodeMap PASSED

kafka.utils.json.JsonValueTest > testDecodeSeq STARTED

kafka.utils.json.JsonValueTest > testDecodeSeq PASSED

kafka.utils.json.JsonValueTest > testJsonObjectGet STARTED

kafka.utils.json.JsonValueTest > testJsonObjectGet PASSED

kafka.utils.json.JsonValueTest > testJsonValueEquals STARTED

kafka.utils.json.JsonValueTest > testJsonValueEquals PASSED

kafka.utils.json.JsonValueTest > testJsonArrayIterator STARTED

kafka.utils.json.JsonValueTest > testJsonArrayIterator PASSED

kafka.utils.json.JsonValueTest > testJsonObjectApply STARTED

kafka.utils.json.JsonValueTest > testJsonObjectApply PASSED


[jira] [Created] (KAFKA-7651) Flaky test SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7651:
---

 Summary: Flaky test 
SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts
 Key: KAFKA-7651
 URL: https://issues.apache.org/jira/browse/KAFKA-7651
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


Here is stacktrace from 
https://builds.apache.org/job/kafka-2.1-jdk8/51/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testMinimumRequestTimeouts/

{code}
Error Message
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
Stacktrace
java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1404)
at 
kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-11-16 Thread Guozhang Wang
Thanks Florian! I will take a look at the PR.



On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois 
wrote:

> Hi Matthias,
>
> Sorry I was absent for a while. I have started a new PR for this KIP. It is
> still in progress for now. I'm working on it.
> https://github.com/apache/kafka/pull/5909
>
> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax  a
> écrit :
>
> > What is the status of this KIP?
> >
> > -Matthias
> >
> > On 7/19/18 5:17 PM, Guozhang Wang wrote:
> > > Hello Florian,
> > >
> > > Sorry for being late... Found myself keep apologizing for late replies
> > > these days. But I do want to push this KIP's progress forward as I see
> it
> > > very important and helpful feature for extensibility.
> > >
> > > About the exceptions, I've gone through them and hopefully it is an
> > > exhaustive list:
> > >
> > > 1. KTable#toStream()
> > > 2. KStream#merge(KStream)
> > > 3. KStream#process() / transform() / transformValues()
> > > 4. KGroupedTable / KGroupedStream#count()
> > >
> > >
> > > Here's my reasoning:
> > >
> > > * It is okay not letting users to override the name for 1/2, since they
> > are
> > > too trivial to be useful for debugging, plus their processor names
> would
> > > not determine any related topic / store names.
> > > * For 3, I'd vote for adding overloaded functions with Named.
> > > * For 4, if users really want to name the processor she can call
> > > aggregate() instead, so I think it is okay to skip this case.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <
> > fhussonn...@gmail.com>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> The option #3 seems to be a good alternative and I find the API more
> > >> elegant (thanks John).
> > >>
> > >> But, we still have the need to overload some methods either because
> > they do
> > >> not accept an action instance or because they are translated to
> multiple
> > >> processors.
> > >>
> > >> For example, this is the case for methods branch() and merge(). We
> could
> > >> introduce a new interface Named (or maybe a different name ?) with a
> > method
> > >> name(). All action interfaces could extend this one to implement the
> > option
> > >> 3).
> > >> This would result by having the following overloads  :
> > >>
> > >> Stream merge(final Named name, final KStream stream);
> > >> KStream[] branch(final Named name, final Predicate > super
> > >> V>... predicates)
> > >>
> > >> N.B : The list above is  not exhaustive
> > >>
> > >> -
> > >> user's code will become :
> > >>
> > >> KStream stream = builder.stream("test");
> > >> KStream[] branches =
> > >> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
> > >> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2
> ==
> > >> 0),
> > >> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2
> > !=
> > >> 0));
> > >>
> > >> branches[0].to("pair");
> > >> branches[1].to("impair");
> > >> -
> > >>
> > >> This is a mix of the options 3) and 1)
> > >>
> > >> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang  a
> > écrit :
> > >>
> > >>> Hi folks, just to summarize the options we have so far:
> > >>>
> > >>> 1) Add a new "as" for KTable / KStream, plus adding new fields for
> > >>> operators-returns-void control objects (the current wiki's proposal).
> > >>>
> > >>> Pros: no more overloads.
> > >>> Cons: a bit departing with the current high-level API design of the
> > DSL,
> > >>> plus, the inconsistency between operators-returns-void and
> > >>> operators-not-return-voids.
> > >>>
> > >>> 2) Add overloaded functions for all operators, that accepts a new
> > control
> > >>> object "Described".
> > >>>
> > >>> Pros: consistent with current APIs.
> > >>> Cons: lots of overloaded functions to add.
> > >>>
> > >>> 3) Add another default function in the interface (thank you J8!) as
> > John
> > >>> proposed.
> > >>>
> > >>> Pros: no overloaded functions, no "Described".
> > >>> Cons: do we lose lambda functions really (seems not if we provide a
> > >> "named"
> > >>> for each func)? Plus "Described" may be more extensible than a single
> > >>> `String`.
> > >>>
> > >>>
> > >>> My principle of considering which one is better depends primarily on
> > "how
> > >>> to make advanced users easily use the additional API, while keeping
> it
> > >>> hidden from normal users who do not care at all". For that purpose I
> > >> think
> > >>> 3) > 1) > 2).
> > >>>
> > >>> One caveat though, is that changing the interface would not be
> > >>> binary-compatible though source-compatible, right? I.e. users need to
> > >>> recompile their code though no changes needed.
> > >>>
> > >>>
> > >>>
> > >>> Another note: for 3), if we really want to keep extensibility of
> > >> Described
> > >>> we could do sth. like:
> > >>>
> > >>> -
> > >>>
> > >>> public interface Predicate {
> > >>> // existing method
> > >>> boolean test(final K key, final V value);
> > >>>
> > >>> // new 

[jira] [Created] (KAFKA-7650) make "auto.create.topics.enable" dynamically configurable.

2018-11-16 Thread xiongqi wu (JIRA)
xiongqi wu created KAFKA-7650:
-

 Summary: make "auto.create.topics.enable"  dynamically 
configurable. 
 Key: KAFKA-7650
 URL: https://issues.apache.org/jira/browse/KAFKA-7650
 Project: Kafka
  Issue Type: Improvement
Reporter: xiongqi wu
Assignee: xiongqi wu


There are several use cases that we want to make "auto.create.topics.enable" 
can be dynamically configured. 

For example:
1) wild card consumer can recreate deleted topics 
2) We also see misconfigured consumer that consumes from wrong clusters ends up 
with creating a lot of zombie topics in target cluster. 

In such cases, we may want to temporarily disable  "auto.create.topics.enable", 
and re-enable topic creation later after problem is solved without restarting 
brokers. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Request for permissions to add KIP for user shnguyen

2018-11-16 Thread Guozhang Wang
Hello Shawn,

You should be able to create KIPs now.

Cheers,
Guozhang

On Fri, Nov 16, 2018 at 3:27 PM Shawn Nguyen 
wrote:

> Hi there,
> Can you grant me permissions for adding a KIP page? My username is shnguyen
> and email is shavvnnngu...@gmail.com on Confluence.
>
> Thanks,
> Shawn
>


-- 
-- Guozhang


Request for permissions to add KIP for user shnguyen

2018-11-16 Thread Shawn Nguyen
Hi there,
Can you grant me permissions for adding a KIP page? My username is shnguyen
and email is shavvnnngu...@gmail.com on Confluence.

Thanks,
Shawn


[jira] [Created] (KAFKA-7649) Flaky test SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7649:
---

 Summary: Flaky test 
SslEndToEndAuthorizationTest.testNoProduceWithDescribeAcl
 Key: KAFKA-7649
 URL: https://issues.apache.org/jira/browse/KAFKA-7649
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


Observed in 
https://builds.apache.org/job/kafka-2.1-jdk8/49/testReport/junit/kafka.api/SslEndToEndAuthorizationTest/testNoProduceWithDescribeAcl/

{code}
Error Message
java.lang.SecurityException: zookeeper.set.acl is true, but the verification of 
the JAAS login file failed.
Stacktrace
java.lang.SecurityException: zookeeper.set.acl is true, but the verification of 
the JAAS login file failed.
at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:361)
at kafka.server.KafkaServer.startup(KafkaServer.scala:202)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
at 
kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:101)
at 
kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:100)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
kafka.integration.KafkaServerTestHarness.setUp(KafkaServerTestHarness.scala:100)
at 
kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:81)
at 
kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:73)
at 
kafka.api.EndToEndAuthorizationTest.setUp(EndToEndAuthorizationTest.scala:180)
at 
kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:72)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 

[jira] [Created] (KAFKA-7648) Flaky test DeleteTopicsRequestTest.testValidDeleteTopicRequests

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7648:
---

 Summary: Flaky test 
DeleteTopicsRequestTest.testValidDeleteTopicRequests
 Key: KAFKA-7648
 URL: https://issues.apache.org/jira/browse/KAFKA-7648
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


Observed in 
[https://builds.apache.org/job/kafka-2.1-jdk8/52/testReport/junit/kafka.server/DeleteTopicsRequestTest/testValidDeleteTopicRequests/]

 
h3. Error Message
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
exists.
h3. Stacktrace
org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-4' already 
exists.
h3. Standard Output
[2018-11-07 17:53:10,812] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Error for partition topic-3-3 at offset 0 
(kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:10,812] ERROR 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
topic-3-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:14,805] WARN Client 
session timed out, have not heard from server in 4000ms for sessionid 
0x10051eebf480003 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,806] WARN Unable to read additional data from client sessionid 
0x10051eebf480003, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,807] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf480002 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,807] WARN Unable to read additional data from client sessionid 
0x10051eebf480002, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:14,823] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf480001 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:14,824] WARN Unable to read additional data from client sessionid 
0x10051eebf480001, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,423] WARN 
Client session timed out, have not heard from server in 4002ms for sessionid 
0x10051eebf48 (org.apache.zookeeper.ClientCnxn:1112) [2018-11-07 
17:53:15,423] WARN Unable to read additional data from client sessionid 
0x10051eebf48, likely client has closed socket 
(org.apache.zookeeper.server.NIOServerCnxn:376) [2018-11-07 17:53:15,879] WARN 
fsync-ing the write ahead log in SyncThread:0 took 4456ms which will adversely 
effect operation latency. See the ZooKeeper troubleshooting guide 
(org.apache.zookeeper.server.persistence.FileTxnLog:338) [2018-11-07 
17:53:16,831] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error 
for partition topic-4-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,087] ERROR 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error for partition 
invalid-timeout-1 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,088] ERROR 
[ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition 
invalid-timeout-3 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition. [2018-11-07 17:53:23,137] ERROR 
[ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition 
invalid-timeout-0 at offset 0 (kafka.server.ReplicaFetcherThread:76) 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
 
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7647) Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7647:
---

 Summary: Flaky test 
LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
 Key: KAFKA-7647
 URL: https://issues.apache.org/jira/browse/KAFKA-7647
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


kafka.log.LogCleanerParameterizedIntegrationTest >
testCleansCombinedCompactAndDeleteTopic[3] FAILED
    java.lang.AssertionError: Contents of the map shouldn't change
expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 ->
(354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353),
2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
(343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 ->
(348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but
was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354),
1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 ->
(342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
(343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 ->
(299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 ->
(355,355))>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:118)
        at
kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7646) Flaky test SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7646:
---

 Summary: Flaky test 
SaslOAuthBearerSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaSubscribe
 Key: KAFKA-7646
 URL: https://issues.apache.org/jira/browse/KAFKA-7646
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin


This test is reported to fail by [~enothereska] as part of 2.1.0 RC0 release 
certification.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7645) Fix flaky unit test for 2.1 branch

2018-11-16 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-7645:
---

 Summary: Fix flaky unit test for 2.1 branch
 Key: KAFKA-7645
 URL: https://issues.apache.org/jira/browse/KAFKA-7645
 Project: Kafka
  Issue Type: Task
Reporter: Dong Lin
Assignee: Dong Lin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7644) Worker Re balance enhancements

2018-11-16 Thread satya (JIRA)
satya created KAFKA-7644:


 Summary: Worker Re balance enhancements
 Key: KAFKA-7644
 URL: https://issues.apache.org/jira/browse/KAFKA-7644
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: satya


Currently Kafka Connect distributed worker triggers a re balance any time there 
is a new connector/task is added irrespective of whether the connector added is 
a source connector or sink connector. 

My understanding has been the worker re balance should be identical to consumer 
group re balance. That said, should not source connectors be immune to the re 
balance ?

Are we not supposed to use source connectors with distributed workers ?

It does appear to me there is some caveat in the way the worker re balance is 
working and it needs enhancement to not trigger unwanted re balances causing 
restarts of all tasks etc.

Kafka connectors should have a way to not restart and stay with existing 
partition assignment if the re balance trigger is related to a different 
connector

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7642) Kafka Connect - graceful shutdown of distributed worker

2018-11-16 Thread satya (JIRA)
satya created KAFKA-7642:


 Summary: Kafka Connect - graceful shutdown of distributed worker
 Key: KAFKA-7642
 URL: https://issues.apache.org/jira/browse/KAFKA-7642
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: satya


Currently i dont find any ability to gracefully shutdown a distributed worker 
other than killing

the process .

 

Could you a shutdown option to the workers.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7643) Connectors do not unload even when tasks fail in kafka connect

2018-11-16 Thread satya (JIRA)
satya created KAFKA-7643:


 Summary: Connectors do not unload even when tasks fail in kafka 
connect
 Key: KAFKA-7643
 URL: https://issues.apache.org/jira/browse/KAFKA-7643
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: satya


If there any issues in the tasks associated with a connector, even though the 
tasks fail, i have seen the connector themselves is not released many times. 

The only option out of this is like submitting an explicit request to delete 
the connector.

There should be a way to shut down the connectors gracefully, if there are 
exceptions encountered in the task that are not retriable.

In addition, Kafka connect also does not have a graceful exit option. There 
will be situations like server maintenance, outages etc where it would be 
prudent to gracefully shutdown the connectors rather than performing a DELETE 
through the REST endpoint.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-328: Ability to suppress updates for KTables

2018-11-16 Thread Bill Bejeck
Hi John,

Thanks for the update, I'm +1 on changes and my +1 vote stands.

-Bill

On Fri, Nov 16, 2018 at 4:19 PM John Roesler  wrote:

> Hi all, sorry to do this again, but during review of the code to add the
> metrics proposed in this KIP, the reviewers and I noticed some
> inconsistencies and drawbacks of the metrics I proposed in the KIP.
>
> Here's the diff:
>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=87295409=24=23
>
> * The proposed metrics were all INFO level, but they would be updated on
> every record, creating a performance concern. If we can refactor the
> metrics framework in the future to resolve this concern, we may move the
> metrics back to INFO level.
> * having separate metrics for memory and disk buffers is unnecessarily
> complex. The main utility is determining how close the buffer is to the
> configured limit, which makes a single metric more useful. I've combined
> them into one "suppression-buffer-size-*" metric.
> * The "intermediate-result-suppression-*" metric would be equivalent to the
> "process-*" metric which is already available on the ProcessorNode. I've
> removed it from the KIP.
> * The "suppression-mem-buffer-evict-*" metric had been proposed as a buffer
> metric, but it makes more sense as a processor node metric, since its
> counterpart is the "process-*" metric. I've replaced it with a processor
> node metric, "suppression-emit-*"
>
> Let me know if you want to recast votes in response to this change.
>
> -John
>
> On Thu, Oct 4, 2018 at 11:26 AM John Roesler  wrote:
>
> > Update: Here's a link to the documented eviction behavior:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-BufferEvictionBehavior(akaSuppressEmitBehavior)
> >
> > On Thu, Oct 4, 2018 at 11:12 AM John Roesler  wrote:
> >
> >> Hello again, all,
> >>
> >> During review, we realized that there is a relationship between this
> >> (KIP-328) and KIP-372.
> >>
> >> KIP-372 proposed to allow naming *all* internal topics, and KIP-328 adds
> >> a new internal topic (the changelog for the suppression buffer).
> >>
> >> However, we didn't consider this relationship in either KIP discussion,
> >> possibly since they were discussed and accepted concurrently.
> >>
> >> I have updated KIP-328 to effectively "merge" the two KIPs by adding a
> >> `withName` builder to Suppressed in the style of the other builders
> added
> >> in KIP-372:
> >>
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=87295409=20=19
> >> .
> >>
> >> I think this should be uncontroversial, but as always, let me know of
> any
> >> objections you may have.
> >>
> >>
> >> Also, note that I'll be updating the KIP to document the exact buffer
> >> eviction behavior. I previously treated this as an internal
> implementation
> >> detail, but after consideration, I think users would want to know the
> >> eviction semantics, especially if they are debugging their applications
> and
> >> scrutinizing the sequence of emitted records.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Thu, Sep 20, 2018 at 5:34 PM John Roesler  wrote:
> >>
> >>> Hello all,
> >>>
> >>> During review of https://github.com/apache/kafka/pull/5567 for
> KIP-328,
> >>> the reviewers raised many good suggestions for the API.
> >>>
> >>> The basic design of the suppress operation remains the same, but the
> >>> config object is (in my opinion) far more ergonomic with their
> >>> suggestions.
> >>>
> >>> I have updated the KIP to reflect the new config (
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-NewSuppressOperator
> >>> )
> >>>
> >>> Please let me know if anyone wishes to change their vote, and we call
> >>> for a recast.
> >>>
> >>> Thanks,
> >>> -John
> >>>
> >>> On Thu, Aug 23, 2018 at 12:54 PM Matthias J. Sax <
> matth...@confluent.io>
> >>> wrote:
> >>>
>  It seems nobody has any objections against the change.
> 
>  That's for the KIP improvement. I'll go ahead and merge the PR.
> 
> 
>  -Matthias
> 
>  On 8/21/18 2:44 PM, John Roesler wrote:
>  > Hello again, all,
>  >
>  > I belatedly had a better idea for adding grace period to the Windows
>  class
>  > hierarchy (TimeWindows, UnlimitedWindows, JoinWindows). Instead of
>  > providing the grace-setter in the abstract class and having to
>  retract it
>  > in UnlimitedWindows, I've made the getter abstract method in Windows
>  and
>  > only added setters to Time and Join windows.
>  >
>  > This should not only improve the ergonomics of grace period, but
> make
>  the
>  > whole class hierarchy more maintainable.
>  >
>  > See the PR for more details:
>  https://github.com/apache/kafka/pull/5536
>  >
>  > I've updated the KIP accordingly. Here's the diff:
> 

Re: [VOTE] KIP-328: Ability to suppress updates for KTables

2018-11-16 Thread John Roesler
Hi all, sorry to do this again, but during review of the code to add the
metrics proposed in this KIP, the reviewers and I noticed some
inconsistencies and drawbacks of the metrics I proposed in the KIP.

Here's the diff:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=87295409=24=23

* The proposed metrics were all INFO level, but they would be updated on
every record, creating a performance concern. If we can refactor the
metrics framework in the future to resolve this concern, we may move the
metrics back to INFO level.
* having separate metrics for memory and disk buffers is unnecessarily
complex. The main utility is determining how close the buffer is to the
configured limit, which makes a single metric more useful. I've combined
them into one "suppression-buffer-size-*" metric.
* The "intermediate-result-suppression-*" metric would be equivalent to the
"process-*" metric which is already available on the ProcessorNode. I've
removed it from the KIP.
* The "suppression-mem-buffer-evict-*" metric had been proposed as a buffer
metric, but it makes more sense as a processor node metric, since its
counterpart is the "process-*" metric. I've replaced it with a processor
node metric, "suppression-emit-*"

Let me know if you want to recast votes in response to this change.

-John

On Thu, Oct 4, 2018 at 11:26 AM John Roesler  wrote:

> Update: Here's a link to the documented eviction behavior:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-BufferEvictionBehavior(akaSuppressEmitBehavior)
>
> On Thu, Oct 4, 2018 at 11:12 AM John Roesler  wrote:
>
>> Hello again, all,
>>
>> During review, we realized that there is a relationship between this
>> (KIP-328) and KIP-372.
>>
>> KIP-372 proposed to allow naming *all* internal topics, and KIP-328 adds
>> a new internal topic (the changelog for the suppression buffer).
>>
>> However, we didn't consider this relationship in either KIP discussion,
>> possibly since they were discussed and accepted concurrently.
>>
>> I have updated KIP-328 to effectively "merge" the two KIPs by adding a
>> `withName` builder to Suppressed in the style of the other builders added
>> in KIP-372:
>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=87295409=20=19
>> .
>>
>> I think this should be uncontroversial, but as always, let me know of any
>> objections you may have.
>>
>>
>> Also, note that I'll be updating the KIP to document the exact buffer
>> eviction behavior. I previously treated this as an internal implementation
>> detail, but after consideration, I think users would want to know the
>> eviction semantics, especially if they are debugging their applications and
>> scrutinizing the sequence of emitted records.
>>
>> Thanks,
>> -John
>>
>> On Thu, Sep 20, 2018 at 5:34 PM John Roesler  wrote:
>>
>>> Hello all,
>>>
>>> During review of https://github.com/apache/kafka/pull/5567 for KIP-328,
>>> the reviewers raised many good suggestions for the API.
>>>
>>> The basic design of the suppress operation remains the same, but the
>>> config object is (in my opinion) far more ergonomic with their
>>> suggestions.
>>>
>>> I have updated the KIP to reflect the new config (
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables#KIP-328:AbilitytosuppressupdatesforKTables-NewSuppressOperator
>>> )
>>>
>>> Please let me know if anyone wishes to change their vote, and we call
>>> for a recast.
>>>
>>> Thanks,
>>> -John
>>>
>>> On Thu, Aug 23, 2018 at 12:54 PM Matthias J. Sax 
>>> wrote:
>>>
 It seems nobody has any objections against the change.

 That's for the KIP improvement. I'll go ahead and merge the PR.


 -Matthias

 On 8/21/18 2:44 PM, John Roesler wrote:
 > Hello again, all,
 >
 > I belatedly had a better idea for adding grace period to the Windows
 class
 > hierarchy (TimeWindows, UnlimitedWindows, JoinWindows). Instead of
 > providing the grace-setter in the abstract class and having to
 retract it
 > in UnlimitedWindows, I've made the getter abstract method in Windows
 and
 > only added setters to Time and Join windows.
 >
 > This should not only improve the ergonomics of grace period, but make
 the
 > whole class hierarchy more maintainable.
 >
 > See the PR for more details:
 https://github.com/apache/kafka/pull/5536
 >
 > I've updated the KIP accordingly. Here's the diff:
 >
 https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=87295409=11=9
 >
 > Please let me know if this changes your vote.
 >
 > Thanks,
 > -John
 >
 > On Mon, Aug 13, 2018 at 5:20 PM John Roesler 
 wrote:
 >
 >> Hey all,
 >>
 >> I just wanted to let you know that a few small issues surfaced during
 >> implementation and 

Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-16 Thread Boyang Chen
Thanks Colin and Jason for the further inputs!

I understand that k8 integration will be the long term solution for KStream to 
serve as a streaming platform, and I buy in the idea of "pre-registration" 
which could save us from using timeout tricks.


> If we have a way to indicate the
> expected group members, then the group can respond to a change much more
> quickly. There would be no need to wait 5 minutes for all members to join
> and it would be robust in the presence of failures. Ironically, static
> membership in this case makes the group more dynamic ;).

The caveat here is that existing KStream users are probably not easy to migrate 
to K8 or config other management tools immediately (including us), so in the 
static membership design I always try to minimize the amount of operation work 
needed by the client, hoping the user just needs to provide unique id 
generation and two timeout configs to easy onboard. This way it would be much 
easier to onboard static membership for all KStream users, not only benefiting 
K8 equipped users. The expected member list would be nice to have, which I 
would definitely like to discuss in a follow-up KIP. Do you think this makes 
sense?

> I think both of these issues could be solved by having some broker-side 
> metadata about groups which is configured through the admin client.  If there 
> was an "expected group size," stored on the broker-side, then we could 
> rebalance immediately whenever the group size reached that size.  Otherwise, 
> we could apply the rebalance delay, like now.  This would give lower latency 
> when setting things up.

Setting expected group size in administrative way is a good suggestion, however 
we need to realize that in daily operation, we could always have transit 
failures or instance unavailable (AWS EC2 spin up failed) which invalidates our 
expectation. For example we are scaling up from 4 to 8 hosts, we would expect 
following operations:

  1.  Use admin tool to set group size from 4 to 8
  2.  Spin up new hosts...
  3.  Group will rebalance when we hit 8 join group requests

If in step 2 we failed to spin up one single host, the scale up mechanism will 
fail and get stuck until we fixed the problem by forcing rebalance the fleet. 
To fast scale up, 7 hosts is still better than 4 hosts IMO, and without 
expansion timeout we are paying extra loading time and human monitoring effort 
to trigger rebalance if necessary (we are introducing a forceStaticRebalance 
API here). So expansion timeout and registration timeout are useful to reduce 
mental burden when one operates the member group, which should be helpful.

> Expected group size is just an expectation, so the group would be allowed to 
> get bigger than that.  We could also have another number which was the > 
> maximum group size.  This really would be a hard upper limit on the size of 
> the group, which admins could optionally configure.

The maximum group size is discussed in this 
JIRA. It is also useful for 
dynamic membership, thanks for the suggestion!

> When a new client joined a group, the server could send back a unique random 
> 64-bit member ID.  The client could hold on to this ID and use it whenever it 
> > rejoined the group after a failure.  Since the ID is random and provided by 
> the server, it can't be spoofed or accidentally reused by a misconfigured 
> client.

It is actually the current consumer behavior, and we are discussing whether to 
enforce member id when accepting join group request in this 
JIRA. Note that we don't have 
way to persist member ids generated by broker on client side through restarts. 
Each time restarted consumers will use "unknown member id" to join again. This 
approach has been discussed before 
and we believe it is fragile to remember ids on client side. That's why we 
propose KIP-345 on top of it.

Thanks a lot!

Boyang

From: Colin McCabe 
Sent: Saturday, November 17, 2018 2:39 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

As Jason said, there are definitely scenarios where we know how many group 
members we expect ahead of time.  It would be nice if we could distinguish 
between the error case of "we expected 5 clients in the group, but one failed" 
and a case like "4 clients started up quickly but the 5th took an extra 2 
seconds."  We can sandbag the group rebalance delay, but that's a hack which 
has clear disadvantages.

It would also be nice to be able to detect when a group member left the group 
briefly but then came back.

I think both of these issues could be solved by having some broker-side 
metadata about groups which is configured through the admin client.  If there 
was an "expected group size," stored on the broker-side, then we could 
rebalance 

[jira] [Created] (KAFKA-7640) Kafka stream interactive query not returning data when state is backed by rocksdb

2018-11-16 Thread hitesh gollahalli bachanna (JIRA)
hitesh gollahalli bachanna created KAFKA-7640:
-

 Summary: Kafka stream interactive query not returning data when 
state is backed by rocksdb
 Key: KAFKA-7640
 URL: https://issues.apache.org/jira/browse/KAFKA-7640
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0
Reporter: hitesh gollahalli bachanna


I have a kafka stream app running with 36 different instance (one for each 
partition). Each instance come up one after the other. And I am building rest 
service on top of the state to access the data.

Here some code that I use:
{code:java}
StreamsMetadata metadata = streams.metadataForKey(store, key, serializer); --> 
call this find ouy which host has the key
if (localSelf.host().equals(hostStoreInfo.getHost())) {
get the key from local store
}
else {
call the remote host using restTemplate
}{code}
The problem now is `metadata` object returned has a different host/ip but the 
data is on a different node. I was able to see using some application logs I 
printed. This happens every time I start my application.

The `allMetadata` method in `KafkaStreams` class says the value will be update 
as when the partition get reassigned. But its not happening in this case. 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7641) Add `group.max.size` to cap group metadata on broker

2018-11-16 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-7641:
--

 Summary: Add `group.max.size` to cap group metadata on broker
 Key: KAFKA-7641
 URL: https://issues.apache.org/jira/browse/KAFKA-7641
 Project: Kafka
  Issue Type: Improvement
Reporter: Boyang Chen






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-16 Thread Colin McCabe
As Jason said, there are definitely scenarios where we know how many group 
members we expect ahead of time.  It would be nice if we could distinguish 
between the error case of "we expected 5 clients in the group, but one failed" 
and a case like "4 clients started up quickly but the 5th took an extra 2 
seconds."  We can sandbag the group rebalance delay, but that's a hack which 
has clear disadvantages.

It would also be nice to be able to detect when a group member left the group 
briefly but then came back.

I think both of these issues could be solved by having some broker-side 
metadata about groups which is configured through the admin client.  If there 
was an "expected group size," stored on the broker-side, then we could 
rebalance immediately whenever the group size reached that size.  Otherwise, we 
could apply the rebalance delay, like now.  This would give lower latency when 
setting things up.

Expected group size is just an expectation, so the group would be allowed to 
get bigger than that.  We could also have another number which was the maximum 
group size.  This really would be a hard upper limit on the size of the group, 
which admins could optionally configure.

When a new client joined a group, the server could send back a unique random 
64-bit member ID.  The client could hold on to this ID and use it whenever it 
rejoined the group after a failure.  Since the ID is random and provided by the 
server, it can't be spoofed or accidentally reused by a misconfigured client.

best,
Colin

On Fri, Nov 16, 2018, at 00:04, Jason Gustafson wrote:
> >
> > If we initialize a set of member names (I assume ids = names here) on
> > broker through Admin API, the client needs to pick up this information
> > simultaneously which I doubt if there is a generic way to achieve that? It
> > would also make the scaling operations difficult if we need to define the
> > member names every time we change the member set which is an extra
> > operation burden. From my daily ops experience, dynamically generate member
> > names on client side would be easier. Is there a good approach to address
> > this issue?
> 
> 
> Yeah, that's a good question. I'm hoping someone with more kubernetes
> experience will jump in here. Basically my goal is to have an approach
> which maps nicely to StatefulSets (
> https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/).
> The pods in a stateful set have an ordinal index, which sounds similar to
> the static ids that I was describing. You can scale up and down a stateful
> set, but you would need a plugin to grow and shrink the consumer group.
> Sounds like it could work, but I'm not sure if it's the best way.
> 
> At Pinterest we maintain 24-7 SLA for 15 - 30 minutes reaction to all the
> > critical streaming services abnormality. One of the burden was the night
> > shift which requires the oncaller to quickly resolve the issue and get the
> > streaming application back on track, however there is a chance of miss. My
> > concern was that if we forfeit the timeout on static membership to trigger
> > rebalance, missing some pages during midnight could be negatively
> > impacting the system performance since we may realize that some partitions
> > stop working for a couple of hours already until next morning. So
> > registration timeout serves as the "last line of defense" to guarantee
> > liveness if no human intervention jumps in.
> 
> 
> Thanks, this is helpful background. I agree this is a risk in the approach
> I've suggested. If we take a step back, I think there are two gaps in the
> protocol for stateful applications:
> 
> 1. We don't have a way to detect the same member across failures or
> restarts. I think streams has some heuristic to try and handle the common
> cases (such as rolling restarts), but the proposal here solves the problem
> in a more robust way.
> 
> 2. We don't have a way to know what the expected membership of the group
> is. This leads us to try tricks like inserting delays into the rebalance
> logic so that the group membership has time to stabilize before we make any
> decisions. In your proposal, we have an expansion timeout, which is
> basically the same thing as far as I can tell.
> 
> I think the first problem is the most important, but it would be nice if we
> can solve the second problem as well. If we have a way to indicate the
> expected group members, then the group can respond to a change much more
> quickly. There would be no need to wait 5 minutes for all members to join
> and it would be robust in the presence of failures. Ironically, static
> membership in this case makes the group more dynamic ;).
> 
> That said, I can see how the registration timeout would be an attractive
> safety net in some cases. Perhaps it would be good enough if we have a way
> to pre-register group members administratively? Members can still be
> expired due to inactivity and we would have a way to get around the
> rebalance delays. 

[jira] [Created] (KAFKA-7639) Read one request at a time from socket to reduce broker memory usage

2018-11-16 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7639:
-

 Summary: Read one request at a time from socket to reduce broker 
memory usage
 Key: KAFKA-7639
 URL: https://issues.apache.org/jira/browse/KAFKA-7639
 Project: Kafka
  Issue Type: Improvement
  Components: network
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.2.0


Broker's Selector currently reads all requests available on the socket when the 
socket is ready for read. These are queued up as staged receives. We mute the 
channel and stop reading any more data until all the staged requests are 
processed. This behaviour is slightly inconsistent since for the initial read 
we drain the socket buffer, allowing it to get filled up again, but if data 
arrives slighly after the initial read, then we dont read from the socket 
buffer until pending requests are processed.

To avoid holding onto requests for longer than required, we should read one 
request at a time even if more data is available in the socket buffer. This is 
especially useful for produce requests which may be large and may take long to 
process.

Note that with the default socket read buffer size of 100K, this is not a 
critical issue. But with larger socket buffers, this could result in excessive 
memory usage if a lot of produce requests are buffered in the broker and the 
producer times out, reconnects and sends more data before broker has cleared 
older requests. By reading one-at-a-time, we reduce the amount of time the 
broker holds onto memory for each request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7638) Trogdor - Support mass task creation endpoint

2018-11-16 Thread Stanislav Kozlovski (JIRA)
Stanislav Kozlovski created KAFKA-7638:
--

 Summary: Trogdor - Support mass task creation endpoint
 Key: KAFKA-7638
 URL: https://issues.apache.org/jira/browse/KAFKA-7638
 Project: Kafka
  Issue Type: Improvement
Reporter: Stanislav Kozlovski
Assignee: Stanislav Kozlovski


Trogdor supports the creation of tasks via the `coordinator/tasks/create` 
endpoint - it currently accepts only one task.

Since Trogdor support scheduling multiple jobs to execute at a certain time 
(via the `startTime` task parameter leveraged by all tasks), it makes sense to 
support creating multiple tasks in a single endpoint. 
Users might want to leverage the scheduler to, say, create 100 tasks. In the 
current model, they would need to issue 100 requests - which is inefficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7637) ERROR Error while writing to checkpoint file due to too many open files

2018-11-16 Thread Sander van Loo (JIRA)
Sander van Loo created KAFKA-7637:
-

 Summary: ERROR Error while writing to checkpoint file due to too 
many open files
 Key: KAFKA-7637
 URL: https://issues.apache.org/jira/browse/KAFKA-7637
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.1.1
 Environment: Red Hat Enterprise Linux Server release 7.4 (Maipo)
Reporter: Sander van Loo


We are running a 3 node Kafka cluster on version 1.1.1 on Red Hat Linux 7.

Max open files is set to 65000.

After running for a few days the nodes have the following open file counts:
 * node01d: 2712
 * node01e: 2770
 * node01f: 4102

After a few weeks of runtime cluster crashes with the following error:

 
{noformat}
[2018-11-12 07:05:16,790] ERROR Error while writing to checkpoint file 
/var/lib/kafka/topics/replication-offset-checkpoint 
(kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: 
/var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.(FileOutputStream.java:213)
    at java.io.FileOutputStream.(FileOutputStream.java:162)
    at 
kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52)
    at 
kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50)
    at 
kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59)
    at 
kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9(ReplicaManager.scala:1384)
    at 
kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9$adapted(ReplicaManager.scala:1384)
    at scala.Option.foreach(Option.scala:257)
    at 
kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7(ReplicaManager.scala:1384)
    at 
kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7$adapted(ReplicaManager.scala:1381)
    at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
    at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
    at 
kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1381)
    at 
kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:242)
    at 
kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114)
    at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
{noformat}
followed by this one:
{noformat}
[2018-11-12 07:05:16,792] ERROR [ReplicaManager broker=3] Error while writing 
to highwatermark file in directory /var/lib/kafka/topics 
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.KafkaStorageException: Error while writing to 
checkpoint file /var/lib/kafka/topics/replication-offset-checkpoint
Caused by: java.io.FileNotFoundException: 
/var/lib/kafka/topics/replication-offset-checkpoint.tmp (Too many open files)
    at java.io.FileOutputStream.open0(Native Method)
    at java.io.FileOutputStream.open(FileOutputStream.java:270)
    at java.io.FileOutputStream.(FileOutputStream.java:213)
    at java.io.FileOutputStream.(FileOutputStream.java:162)
    at 
kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:52)
    at 
kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:50)
    at 
kafka.server.checkpoints.OffsetCheckpointFile.write(OffsetCheckpointFile.scala:59)
    at 
kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9(ReplicaManager.scala:1384)
    at 
kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$9$adapted(ReplicaManager.scala:1384)
    at scala.Option.foreach(Option.scala:257)
    at 
kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7(ReplicaManager.scala:1384)
    at 
kafka.server.ReplicaManager.$anonfun$checkpointHighWatermarks$7$adapted(ReplicaManager.scala:1381)
    at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:120)
    at 

[jira] [Created] (KAFKA-7636) Allow consumer to update maxPollRecords value

2018-11-16 Thread Kcirtap Seven (JIRA)
Kcirtap Seven created KAFKA-7636:


 Summary: Allow consumer to update maxPollRecords value
 Key: KAFKA-7636
 URL: https://issues.apache.org/jira/browse/KAFKA-7636
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.0.1
Reporter: Kcirtap Seven


Hi,

We have two use cases where we would need to change the max.poll.records 
parameter on the fly :

1. We offer a REST API to get 'feedbacks'. This API takes into account a 
parameter 'count'. 
The system was previously based on cassandra. It is now based on kafka and 
'feedbacks' are stored into kafka topics. 
To be compliant with the legacy interface contract, we would like to be able to 
change the max.poll.records on the fly to take into account this 'count' 
parameter. 

2. We receive 'notification requests' related to a 'sender' via a REST API. We 
store those requests into topics (by sender). 
Each sender is associated with a weight. Here is the algorithm that process the 
requests :

    1. At each iteration, we process at max n records (n configurable) for the 
whole bunch of requests. For this example, let's say 100. 
    2. We compute the max poll records for each sender. Let's say we have 3 
senders with the following weight 2, 1, 1. Hence 50 records max for the first 
one, 25 for the others two. 
    3. We consume the topics one after the other. We would like to reallocate 
some capacity to remaining consumers if the max.poll.records is not reached for 
the current consumer. Let'say at each iteration we make the following 
synchronous calls :
            sender1Consumer.poll() with computed max.poll.records 50
            sender2Consumer.poll() with computed max.poll.records 25
            sender3Consumer.poll() with computed max.poll.records 25
       If the first call returns only 10 records, we would like to reallocate 
the 40 "spare" records to the other consumers, 20 for each for instance (or 
another strategy). We would make the following calls instead : 
            sender2Consumer.poll() with updated max.poll.records 45
            sender3Consumer.poll() with updated max.poll.records 45

 

For that requirement we also need to change the max.poll.records on the fly.


Regards,



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7565) NPE in KafkaConsumer

2018-11-16 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7565.
---
   Resolution: Duplicate
Fix Version/s: (was: 2.2.0)

[~avakhrenev] Thanks for testing, closing this issue.

> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start/close fetchers

2018-11-16 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7576.
---
Resolution: Fixed
  Reviewer: Jason Gustafson

> Dynamic update of replica fetcher threads may fail to start/close fetchers
> --
>
> Key: KAFKA-7576
> URL: https://issues.apache.org/jira/browse/KAFKA-7576
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.1, 2.0.1, 2.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.1.2, 2.1.1, 2.0.2
>
>
> KAFKA-6051 moved  ReplicaFetcherBlockingSend shutdown earlier in the shutdown 
> sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers 
> can now throw an exception because Selector may be closed on a different 
> thread while data is being written on another thread. KAFKA-7464 changed this 
> behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and 
> not propagated to avoid exceptions during broker shutdown.
> When config update notification of `num.replica.fetchers` is processed, 
> partitions are migrated as necessary to increase or decrease the number of 
> fetcher threads. Existing fetchers are shutdown first before new ones are 
> created.This migration is performed on the thread processing ZK change 
> notification. The shutdown of Selector of existing fetchers is not safe since 
> replica fetcher thread may be processing data at the time using the same 
> Selector.
> Without the fix from KAFKA-7464, another update of the config or broker 
> restart is required to restart the replica fetchers after dynamic config 
> update if shutdown encounters an exception.
> Exception stack trace:
> {code:java}
> java.lang.IllegalArgumentException
> at java.nio.Buffer.position(Buffer.java:244)
> at sun.nio.ch.IOUtil.write(IOUtil.java:68)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210)
> at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160)
> at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718)
> at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70)
> at org.apache.kafka.common.network.Selector.doClose(Selector.java:748)
> at org.apache.kafka.common.network.Selector.close(Selector.java:736)
> at org.apache.kafka.common.network.Selector.close(Selector.java:698)
> at org.apache.kafka.common.network.Selector.close(Selector.java:314)
> at 
> org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533)
> at 
> kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107)
> at 
> kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90)
> at 
> kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76)
> at 
> kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
> at 
> kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72)
> at 
> kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88)
> at 
> kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574)
> at 
> kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
> at 
> kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410)
> kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with 

Build failed in Jenkins: kafka-trunk-jdk8 #3205

2018-11-16 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-6774; Improve the default group id behavior in KafkaConsumer

--
[...truncated 2.59 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueIsEqualWithNullForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualWithNullForCompareValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareValueWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 

[jira] [Created] (KAFKA-7635) FetcherThread stops processing after "Error processing data for partition"

2018-11-16 Thread Steven Aerts (JIRA)
Steven Aerts created KAFKA-7635:
---

 Summary: FetcherThread stops processing after "Error processing 
data for partition"
 Key: KAFKA-7635
 URL: https://issues.apache.org/jira/browse/KAFKA-7635
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 2.0.0
Reporter: Steven Aerts
 Attachments: stacktraces.txt

After disabling unclean leader leader again after recovery of a situation where 
we enabled unclean leader due to a split brain in zookeeper, we saw that some 
of our stopped replicating their partitions.

Digging into the logs, we saw that the replica thread was stopped because one 
partition had a failure which threw a [{{Error processing data for partition}} 
exception|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L207].
  But the broker kept running and serving the partitions from which it was 
leader.

We saw three different types of exceptions triggering this (example stacktraces 
attached):
* {{kafka.common.UnexpectedAppendOffsetException}}
* {{Trying to roll a new log segment for topic partition partition-b-97 with 
start offset 1388 while it already exists.}}
* {{Kafka scheduler is not running.}}

We think there are two acceptable ways for the kafka broker to handle this:
* Mark those partitions as a partition with error and handle them accordingly.  
As is done [when a {{CorruptRecordException}} or 
{{KafkaStorageException}}|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/AbstractFetcherThread.scala#L196]
 is thrown.
* Exit the broker as is done [when log truncation is not 
allowed|https://github.com/apache/kafka/blob/2.0.0/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L189].
 
Maybe even a combination of both.  Our probably naive idea is that for the 
first two types the first strategy would be the best, but for the last type, it 
is probably better to re-throw a {{FatalExitError}} and exit the broker.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6774) Improve default groupId behavior in consumer

2018-11-16 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6774.

Resolution: Fixed

> Improve default groupId behavior in consumer
> 
>
> Key: KAFKA-6774
> URL: https://issues.apache.org/jira/browse/KAFKA-6774
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> At the moment, the default groupId in the consumer is "". If you try to use 
> this to subscribe() to a topic, the broker will reject the group as invalid. 
> On the other hand, if you use it with assign(), then the user will be able to 
> fetch and commit offsets using the empty groupId. Probably 99% of the time, 
> this is not what the user expects. Instead you would probably expect that if 
> no groupId is provided, then no committed offsets will be fetched at all and 
> we'll just use the auto reset behavior if we don't have a current position.
> Here are two potential solutions (both requiring a KIP):
> 1. Change the default to null. We will preserve the current behavior for 
> subscribe(). When using assign(), we will not bother fetching committed 
> offsets for the null groupId, and any attempt to commit offsets will raise an 
> error. The user can still use the empty groupId, but they have to specify it 
> explicitly.
> 2. Keep the current default, but change the consumer to treat this value as 
> if it were null as described in option 1. The argument for this behavior is 
> that using the empty groupId to commit offsets is inherently a dangerous 
> practice and should not be permitted. We'd have to convince ourselves that 
> we're fine not needing to allow the empty groupId for backwards compatibility 
> though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by specifying member id

2018-11-16 Thread Jason Gustafson
>
> If we initialize a set of member names (I assume ids = names here) on
> broker through Admin API, the client needs to pick up this information
> simultaneously which I doubt if there is a generic way to achieve that? It
> would also make the scaling operations difficult if we need to define the
> member names every time we change the member set which is an extra
> operation burden. From my daily ops experience, dynamically generate member
> names on client side would be easier. Is there a good approach to address
> this issue?


Yeah, that's a good question. I'm hoping someone with more kubernetes
experience will jump in here. Basically my goal is to have an approach
which maps nicely to StatefulSets (
https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/).
The pods in a stateful set have an ordinal index, which sounds similar to
the static ids that I was describing. You can scale up and down a stateful
set, but you would need a plugin to grow and shrink the consumer group.
Sounds like it could work, but I'm not sure if it's the best way.

At Pinterest we maintain 24-7 SLA for 15 - 30 minutes reaction to all the
> critical streaming services abnormality. One of the burden was the night
> shift which requires the oncaller to quickly resolve the issue and get the
> streaming application back on track, however there is a chance of miss. My
> concern was that if we forfeit the timeout on static membership to trigger
> rebalance, missing some pages during midnight could be negatively
> impacting the system performance since we may realize that some partitions
> stop working for a couple of hours already until next morning. So
> registration timeout serves as the "last line of defense" to guarantee
> liveness if no human intervention jumps in.


Thanks, this is helpful background. I agree this is a risk in the approach
I've suggested. If we take a step back, I think there are two gaps in the
protocol for stateful applications:

1. We don't have a way to detect the same member across failures or
restarts. I think streams has some heuristic to try and handle the common
cases (such as rolling restarts), but the proposal here solves the problem
in a more robust way.

2. We don't have a way to know what the expected membership of the group
is. This leads us to try tricks like inserting delays into the rebalance
logic so that the group membership has time to stabilize before we make any
decisions. In your proposal, we have an expansion timeout, which is
basically the same thing as far as I can tell.

I think the first problem is the most important, but it would be nice if we
can solve the second problem as well. If we have a way to indicate the
expected group members, then the group can respond to a change much more
quickly. There would be no need to wait 5 minutes for all members to join
and it would be robust in the presence of failures. Ironically, static
membership in this case makes the group more dynamic ;).

That said, I can see how the registration timeout would be an attractive
safety net in some cases. Perhaps it would be good enough if we have a way
to pre-register group members administratively? Members can still be
expired due to inactivity and we would have a way to get around the
rebalance delays. Would that work?

Thanks,
Jason


On Wed, Nov 14, 2018 at 10:24 PM, Boyang Chen  wrote:

> Thank you for the clarification Jason! The proposals make sense here and
> let me continue the discussion.
>
> > Then the ids would be determined using some convention. Most likely, we
> would just use sequential numbers 0, 1, 2,
> > etc. We do the same thing for partition ids.
>
>
> If we initialize a set of member names (I assume ids = names here) on
> broker through Admin API, the client needs to pick up this information
> simultaneously which I doubt if there is a generic way to achieve that? It
> would also make the scaling operations difficult if we need to define the
> member names every time we change the member set which is an extra
> operation burden. From my daily ops experience, dynamically generate member
> names on client side would be easier. Is there a good approach to address
> this issue?
>
> > I was thinking that the registration is specified ahead of time and
> remains valid until changed. It would be more like a
> > replica assignment. We don't move partitions just because a broker is
> down.
> > The expectation is that it the broker will eventually return.
>
>
> At Pinterest we maintain 24-7 SLA for 15 - 30 minutes reaction to all the
> critical streaming services abnormality. One of the burden was the night
> shift which requires the oncaller to quickly resolve the issue and get the
> streaming application back on track, however there is a chance of miss. My
> concern was that if we forfeit the timeout on static membership to trigger
> rebalance, missing some pages during midnight could be negatively impacting
> the system performance since we may realize that some