[jira] [Commented] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2018-01-18 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331722#comment-16331722
 ] 

Matthias J. Sax commented on KAFKA-6461:


For me, it also passes as-is – but not always... But I agree, that {{value}} 
could be {{null}} – maybe it crashed if we hit a bad cache flush...

> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2018-01-18 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331672#comment-16331672
 ] 

Ted Yu commented on KAFKA-6461:
---

With the following:
{code}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
 b/streams/src/test/java/org/apache/kafka/streams/integration/TableTab
index f3eceb0..5eaabbb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -79,7 +79,7 @@ public class TableTableJoinIntegrationTest extends 
AbstractJoinIntegrationTest {
 @Override
 public void apply(final Long key, final String value) {
 numRecordsExpected++;
-if (value.equals(expected)) {
+if (value != null && value.equals(expected)) {
 boolean ret = finalResultReached.compareAndSet(false, true);

 if (!ret) {
{code}
TableTableJoinIntegrationTest passes.

> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6232) SaslSslAdminClientIntegrationTest sometimes fails

2018-01-18 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331606#comment-16331606
 ] 

Ted Yu commented on KAFKA-6232:
---

Hasn't seen this failure lately.

> SaslSslAdminClientIntegrationTest sometimes fails
> -
>
> Key: KAFKA-6232
> URL: https://issues.apache.org/jira/browse/KAFKA-6232
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Priority: Major
>  Labels: security
> Attachments: saslSslAdminClientIntegrationTest-203.out
>
>
> Here was one recent occurrence:
> https://builds.apache.org/job/kafka-trunk-jdk9/203/testReport/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.LeaderNotAvailableException: There is no 
> leader for this topic-partition as we are in the middle of a leadership 
> election.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:225)
>   at 
> kafka.api.AdminClientIntegrationTest.$anonfun$testLogStartOffsetCheckpoint$3(AdminClientIntegrationTest.scala:762)
> {code}
> In the test output, I saw:
> {code}
> [2017-11-17 23:15:45,593] ERROR [KafkaApi-1] Error when handling request 
> {filters=[{resource_type=2,resource_name=foobar,principal=User:ANONYMOUS,host=*,operation=3,permission_type=3},{resource_type=5,resource_name=transactional_id,principal=User:ANONYMOUS,host=*,operation=4,permission_type=3}]}
>  (kafka.server.KafkaApis:107)
> org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
> Request(processor=2, connectionId=127.0.0.1:36295-127.0.0.1:58183-0, 
> session=Session(User:client2,localhost/127.0.0.1), 
> listenerName=ListenerName(SASL_SSL), securityProtocol=SASL_SSL, buffer=null) 
> is not authorized.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6462) ResetIntegrationTest unstable

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331540#comment-16331540
 ] 

ASF GitHub Bot commented on KAFKA-6462:
---

mjsax opened a new pull request #4446: KAFKA-6462: fix unstable 
ResetIntegrationTest
URL: https://github.com/apache/kafka/pull/4446
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ResetIntegrationTest unstable
> -
>
> Key: KAFKA-6462
> URL: https://issues.apache.org/jira/browse/KAFKA-6462
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 6 ms.
> at 
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1155)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:847)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(IntegrationTestUtils.java:133)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(IntegrationTestUtils.java:118)
> at 
> org.apache.kafka.streams.integration.AbstractResetIntegrationTest.add10InputElements(AbstractResetIntegrationTest.java:199)
> at 
> org.apache.kafka.streams.integration.AbstractResetIntegrationTest.prepareTest(AbstractResetIntegrationTest.java:175)
> at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.before(ResetIntegrationTest.java:56){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6462) ResetIntegrationTest unstable

2018-01-18 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6462:
--

 Summary: ResetIntegrationTest unstable
 Key: KAFKA-6462
 URL: https://issues.apache.org/jira/browse/KAFKA-6462
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax


{noformat}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
at 
org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1155)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:847)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(IntegrationTestUtils.java:133)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(IntegrationTestUtils.java:118)
at 
org.apache.kafka.streams.integration.AbstractResetIntegrationTest.add10InputElements(AbstractResetIntegrationTest.java:199)
at 
org.apache.kafka.streams.integration.AbstractResetIntegrationTest.prepareTest(AbstractResetIntegrationTest.java:175)
at 
org.apache.kafka.streams.integration.ResetIntegrationTest.before(ResetIntegrationTest.java:56){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2018-01-18 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6461:
---
Priority: Minor  (was: Major)

> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6185) Selector memory leak with high likelihood of OOM in case of down conversion

2018-01-18 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-6185:
---
Component/s: core

> Selector memory leak with high likelihood of OOM in case of down conversion
> ---
>
> Key: KAFKA-6185
> URL: https://issues.apache.org/jira/browse/KAFKA-6185
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
> Environment: Ubuntu 14.04.5 LTS
> 5 brokers: 1&2 on 1.0.0 3,4,5 on 0.11.0.1
> inter.broker.protocol.version=0.11.0.1
> log.message.format.version=0.11.0.1
> clients a mix of 0.9, 0.10, 0.11
>Reporter: Brett Rann
>Assignee: Rajini Sivaram
>Priority: Blocker
>  Labels: regression
> Fix For: 1.1.0, 1.0.1
>
> Attachments: Kafka_Internals___Datadog.png, 
> Kafka_Internals___Datadog.png
>
>
> We are testing 1.0.0 in a couple of environments.
> Both have about 5 brokers, with two 1.0.0 brokers and the rest 0.11.0.1 
> brokers.
> One is using on disk message format 0.9.0.1, the other 0.11.0.1
> we have 0.9, 0.10, and 0.11 clients connecting.
> The cluster on the 0.9.0.1 format is running fine for a week.
> But the cluster on the 0.11.0.1 format is consistently having memory issues, 
> only on the two upgraded brokers running 1.0.0.
> The first occurrence of the error comes along with this stack trace
> {noformat}
> {"timestamp":"2017-11-06 
> 14:22:32,402","level":"ERROR","logger":"kafka.server.KafkaApis","thread":"kafka-request-handler-7","message":"[KafkaApi-1]
>  Error when handling request 
> {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=maxwell.users,partitions=[{partition=0,fetch_offset=227537,max_bytes=1100},{partition=4,fetch_offset=354468,max_bytes=1100},{partition=5,fetch_offset=266524,max_bytes=1100},{partition=8,fetch_offset=324562,max_bytes=1100},{partition=10,fetch_offset=292931,max_bytes=1100},{partition=12,fetch_offset=325718,max_bytes=1100},{partition=15,fetch_offset=229036,max_bytes=1100}]}]}"}
> java.lang.OutOfMemoryError: Java heap space
> at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
> at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> at 
> org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101)
> at 
> org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:520)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:518)
> at scala.Option.map(Option.scala:146)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:518)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:508)
> at scala.Option.flatMap(Option.scala:171)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:508)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:556)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:555)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:555)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:569)
> at 
> kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2034)
> at 
> kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:52)
> at 
> kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2033)
> at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:569)
> at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:588)
> at 
> kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:175)
> at 
> 

[jira] [Commented] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331390#comment-16331390
 ] 

ASF GitHub Bot commented on KAFKA-6461:
---

mjsax opened a new pull request #4443: KAFKA-6461: fix 
TableTableJoinIntegrationTest [WIP]
URL: https://github.com/apache/kafka/pull/4443
 
 
   DO NOT MERGE -- WIP for debugging


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2018-01-18 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331348#comment-16331348
 ] 

Guozhang Wang commented on KAFKA-3184:
--

[~yuzhih...@gmail.com] sorry for the late reply!

Your understanding is basically right, and here are my thoughts about the 
flushing:

1. It should not be expensive and stopping-the-world, since flushing calls may 
be called on each commit. I was thinking that it could either by async (but we 
need to make the checkpointed offsets value to be consistent with the 
checkpoint image on disk itself); or we only do the checkpointing every N. 
flush calls.

2. As for {{persistent()}}, currently it is only used in 
{{ProcessorStateManager#checkpoint}} and 
{{StoreChangelogReader#restoredOffsets}}; with in-memory state stores being 
checkpointed periodically, I think we can just deprecate this flag and let 
these two callers always checkpoint / save restored offsets.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2018-01-18 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-6461:
--

 Summary: TableTableJoinIntegrationTest is unstable if caching is 
enabled
 Key: KAFKA-6461
 URL: https://issues.apache.org/jira/browse/KAFKA-6461
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Matthias J. Sax
Assignee: Matthias J. Sax


{noformat}
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testLeftInner[caching enabled = true] FAILED
20:41:05 java.lang.AssertionError: Condition not met within timeout 15000. 
Never received expected final result.
20:41:05 at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
20:41:05 at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
20:41:05 at 
org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
20:41:05 at 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4932) Add UUID Serde

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331304#comment-16331304
 ] 

ASF GitHub Bot commented on KAFKA-4932:
---

brandonkirchner opened a new pull request #4438: KAFKA-4932 add UUID serializer 
/ deserializer
URL: https://github.com/apache/kafka/pull/4438
 
 
   [KAFKA-4932](https://issues.apache.org/jira/browse/KAFKA-4932)
   
   Added a UUID Serializer / Deserializer.
   
   Added the UUID type to the SerializationTest


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add UUID Serde
> --
>
> Key: KAFKA-4932
> URL: https://issues.apache.org/jira/browse/KAFKA-4932
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Brandon Kirchner
>Priority: Minor
>  Labels: needs-kip, newbie
>
> I propose adding serializers and deserializers for the java.util.UUID class.
> I have many use cases where I want to set the key of a Kafka message to be a 
> UUID. Currently, I need to turn UUIDs into strings or byte arrays and use 
> their associated Serdes, but it would be more convenient to serialize and 
> deserialize UUIDs directly.
> I'd propose that the serializer and deserializer use the 36-byte string 
> representation, calling UUID.toString and UUID.fromString, and then using the 
> existing StringSerializer / StringDeserializer to finish the job. We would 
> also wrap these in a Serde and modify the streams Serdes class to include 
> this in the list of supported types.
> Optionally, we could have the deserializer support a 16-byte representation 
> and it would check the size of the input byte array to determine whether it's 
> a binary or string representation of the UUID. It's not well defined whether 
> the most significant bits or least significant go first, so this deserializer 
> would have to support only one or the other.
> Similary, if the deserializer supported a 16-byte representation, there could 
> be two variants of the serializer, a UUIDStringSerializer and a 
> UUIDBytesSerializer.
> I would be willing to write this PR, but am looking for feedback about 
> whether there are significant concerns here around ambiguity of what the byte 
> representation of a UUID should be, or if there's desire to keep to list of 
> built-in Serdes minimal such that a PR would be unlikely to be accepted.
> KIP Link: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4932) Add UUID Serde

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331303#comment-16331303
 ] 

ASF GitHub Bot commented on KAFKA-4932:
---

brandonkirchner closed pull request #4438: KAFKA-4932 add UUID serializer / 
deserializer
URL: https://github.com/apache/kafka/pull/4438
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
index d6b4d2da611..3377a27897b 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java
@@ -20,6 +20,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * Factory for creating serializers / deserializers.
@@ -112,6 +113,12 @@ public ByteArraySerde() {
 }
 }
 
+static public final class UUIDSerde extends WrapperSerde {
+public UUIDSerde() {
+super(new UUIDSerializer(), new UUIDDeserializer());
+}
+}
+
 @SuppressWarnings("unchecked")
 static public  Serde serdeFrom(Class type) {
 if (String.class.isAssignableFrom(type)) {
@@ -150,9 +157,13 @@ public ByteArraySerde() {
 return (Serde) Bytes();
 }
 
+if (UUID.class.isAssignableFrom(type)) {
+return (Serde) UUID();
+}
+
 // TODO: we can also serializes objects of type T using generic Java 
serialization by default
 throw new IllegalArgumentException("Unknown class for built-in 
serializer. Supported types are: " +
-"String, Short, Integer, Long, Float, Double, ByteArray, 
ByteBuffer, Bytes");
+"String, Short, Integer, Long, Float, Double, ByteArray, 
ByteBuffer, Bytes, UUID");
 }
 
 /**
@@ -228,6 +239,13 @@ public ByteArraySerde() {
 return new BytesSerde();
 }
 
+/*
+ * A serde for nullable {@code UUID} type
+ */
+static public Serde UUID() {
+return new UUIDSerde();
+}
+
 /*
  * A serde for nullable {@code byte[]} type.
  */
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
new file mode 100644
index 000..81fc0e331c7
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/UUIDDeserializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.serialization;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.UUID;
+
+public class UUIDDeserializer implements Deserializer {
+private String encoding = "UTF8";
+
+@Override
+public void configure(Map configs, boolean isKey) {
+String propertyName = isKey ? "key.deserializer.encoding" : 
"value.deserializer.encoding";
+Object encodingValue = configs.get(propertyName);
+if (encodingValue == null)
+encodingValue = configs.get("deserializer.encoding");
+if (encodingValue != null && encodingValue instanceof String)
+encoding = (String) encodingValue;
+}
+
+@Override
+public UUID deserialize(String topic, byte[] data) {
+try {
+if (data == null)
+return null;
+else
+return UUID.fromString(new String(data, encoding));
+} catch (UnsupportedEncodingException e) {
+throw new SerializationException("Error when deserializing byte[] 
to UUID due to unsupported encoding " + encoding);
+}
+}
+
+@Override
+public void close() {
+  // do nothing
+}
+}
diff --git 

[jira] [Resolved] (KAFKA-6398) Non-aggregation KTable generation operator does not construct value getter correctly

2018-01-18 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6398.
--
   Resolution: Fixed
Fix Version/s: 1.0.1
   1.1.0

> Non-aggregation KTable generation operator does not construct value getter 
> correctly
> 
>
> Key: KAFKA-6398
> URL: https://issues.apache.org/jira/browse/KAFKA-6398
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bug
> Fix For: 1.1.0, 1.0.1
>
>
> For any operator that generates a KTable, its {{valueGetterSupplier}} has 
> three code path:
> 1. If the operator is a KTable source operator, using its materialized state 
> store for value getter (note that currently we always materialize on KTable 
> source).
> 2. If the operator is an aggregation operator, then its generated KTable 
> should always be materialized so we just use its materialized state store.
> 3. Otherwise, we treat the value getter in a per-operator basis.
> For 3) above, what we SHOULD do is that, if the generated KTable is 
> materialized, the value getter would just rely on its materialized state 
> store to get the value; otherwise we just rely on the operator itself to 
> define which parent's value getter to inherit and what computational logic to 
> apply on-the-fly to get the value. For example, for {{KTable#filter()}} where 
> the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just 
> get from parent's value getter and then apply the filter on the fly; and in 
> addition we should let the future operators to be able to access its parent's 
> materialized state store via {{connectProcessorAndStateStore}}.
> However, current code does not do this correctly: it 1) does not check if the 
> result KTable is materialized or not, but always try to use its parent's 
> value getter, and 2) it does not try to connect its parent's materialized 
> store to the future operator. As a result, these operators such as 
> {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would 
> result in TopologyException when building. The following is an example:
> 
> Using a non-materialized KTable in a stream-table join fails:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(...);
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> fails with
> {noformat}
> org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology 
> building: StateStore null is not added yet.
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563)
> {noformat}
> Adding a store name is not sufficient as workaround but fails differently:
> {noformat}
> final KTable filteredKTable = builder.table("table-topic").filter(..., 
> "STORE-NAME");
> builder.stream("stream-topic").join(filteredKTable,...);
> {noformat}
> error:
> {noformat}
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor KSTREAM-JOIN-05
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153)
> Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid 
> topology building: Processor KSTREAM-JOIN-05 has no access to 
> StateStore KTABLE-SOURCE-STATE-STORE-00
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121)
>   at 
> 

[jira] [Updated] (KAFKA-5083) always leave the last surviving member of the ISR in ZK

2018-01-18 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-5083:
---
Fix Version/s: 1.1.0

> always leave the last surviving member of the ISR in ZK
> ---
>
> Key: KAFKA-5083
> URL: https://issues.apache.org/jira/browse/KAFKA-5083
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Onur Karaman
>Assignee: Onur Karaman
>Priority: Major
> Fix For: 1.1.0
>
>
> Currently we erase ISR membership if the replica to be removed from the ISR 
> is the last surviving member of the ISR and unclean leader election is 
> enabled for the corresponding topic.
> We should investigate leaving the last replica in ISR in ZK, independent of 
> whether unclean leader election is enabled or not. That way, if people 
> re-disabled unclean leader election, we can still try to elect the leader 
> from the last in-sync replica.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5117) Kafka Connect REST endpoints reveal Password typed values

2018-01-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5117?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331104#comment-16331104
 ] 

ASF GitHub Bot commented on KAFKA-5117:
---

Tang8330 opened a new pull request #4441: [KAFKA-5117]: Password Mask to Kafka 
Connect REST Endpoint
URL: https://github.com/apache/kafka/pull/4441
 
 
   ⚠️  WIP ⚠️ 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Connect REST endpoints reveal Password typed values
> -
>
> Key: KAFKA-5117
> URL: https://issues.apache.org/jira/browse/KAFKA-5117
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Thomas Holmes
>Priority: Major
>  Labels: needs-kip
>
> A Kafka Connect connector can specify ConfigDef keys as type of Password. 
> This type was added to prevent logging the values (instead "[hidden]" is 
> logged).
> This change does not apply to the values returned by executing a GET on 
> {{connectors/\{connector-name\}}} and 
> {{connectors/\{connector-name\}/config}}. This creates an easily accessible 
> way for an attacker who has infiltrated your network to gain access to 
> potential secrets that should not be available.
> I have started on a code change that addresses this issue by parsing the 
> config values through the ConfigDef for the connector and returning their 
> output instead (which leads to the masking of Password typed configs as 
> [hidden]).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-1120) Controller could miss a broker state change

2018-01-18 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331093#comment-16331093
 ] 

Jeff Widman edited comment on KAFKA-1120 at 1/18/18 7:58 PM:
-

The issue description says "the broker will be in this weird state until it is 
restarted."

Could this also be fixed by simply forcing a controller re-election through 
removing the /controller znode? Since it will re-identify the leaders? In some 
scenarios, it seems that might be a lighter-weight solution. I understand this 
does not fix the root code cause, but just want to be sure I understand what 
options I have if we hit this in an emergency situation.


was (Author: jeffwidman):
The issue description says "the broker will be in this weird state until it is 
restarted."

Couldn't this also be fixed by simply forcing a controller re-election? Since 
it will re-identiy the leaders?

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Mickael Maison
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change

2018-01-18 Thread Jeff Widman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16331093#comment-16331093
 ] 

Jeff Widman commented on KAFKA-1120:


The issue description says "the broker will be in this weird state until it is 
restarted."

Couldn't this also be fixed by simply forcing a controller re-election? Since 
it will re-identiy the leaders?

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>Assignee: Mickael Maison
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5697) StreamThread.close() need to interrupt the stream threads to break the loop

2018-01-18 Thread Mariam John (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mariam John updated KAFKA-5697:
---
Description: 
In {{StreamThread.close()}} we currently do nothing but set the state, hoping 
the stream thread may eventually check it and shutdown itself. However, under 
certain scenarios the thread may get blocked within a single loop and hence 
will never check on this state enum. For example, it's {{consumer.poll}} call 
trigger {{ensureCoordinatorReady()}} which will block until the coordinator can 
be found. If the coordinator broker is never up and running then the Stream 
instance will be blocked forever.

A simple way to produce this issue is to start the work count demo without 
starting the ZK / Kafka broker, and then it will get stuck in a single loop and 
even `ctrl-C` will not stop it since its set state will never be read by the 
thread:
{code:java}
[2017-08-03 15:17:39,981] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,046] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,101] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,206] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,261] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,366] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,472] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
{code}
 

  was:
In {{StreamThread.close()}} we currently do nothing but set the state, hoping 
the stream thread may eventually check it and shutdown itself. However, under 
certain scenarios the thread may get blocked within a single loop and hence 
will never check on this state enum. For example, it's {{consumer.poll}} call 
trigger {{ensureCoordinatorReady()}} which will block until the coordinator can 
be found. If the coordinator broker is never up and running then the Stream 
instance will be blocked forever.

A simple way to produce this issue is to start the work count demo without 
starting the ZK / Kafka broker, and then it will get stuck in a single loop and 
even `ctrl-C` will not stop it since its set state will never be read by the 
thread:

{code}
[2017-08-03 15:17:39,981] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,046] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,101] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,206] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,261] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,366] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-03 15:17:40,472] WARN Connection to node -1 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
^C[2017-08-03 15:17:40,580] WARN Connection to node -1 could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient)
{code}


> StreamThread.close() need to interrupt the stream threads to break the loop
> ---
>
> Key: KAFKA-5697
> URL: https://issues.apache.org/jira/browse/KAFKA-5697
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Mariam John
>Priority: Major
>
> In {{StreamThread.close()}} we currently do nothing but set the state, hoping 
> the stream thread may eventually check it and shutdown itself. However, under 
> certain scenarios the thread may get blocked within a single loop and hence 
> will never check on this state enum. For example, it's {{consumer.poll}} call 
> trigger {{ensureCoordinatorReady()}} which will block