[jira] [Resolved] (KAFKA-6796) Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas

2018-04-24 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-6796.

   Resolution: Fixed
Fix Version/s: 1.2.0

> Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas
> -
>
> Key: KAFKA-6796
> URL: https://issues.apache.org/jira/browse/KAFKA-6796
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.2.0
>
>
> Currently if the client sends a produce request or a fetch request to a 
> broker which isn't a replica, we return UNKNOWN_TOPIC_OR_PARTITION. This is a 
> bit surprising to see when the topic actually exists. It would be better to 
> return NOT_LEADER to avoid confusion. Clients typically handle both errors by 
> refreshing metadata and retrying, so changing this should not cause any 
> change in behavior on the client. This case can be hit following a partition 
> reassignment after the leader is moved and the local replica is deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6796) Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451660#comment-16451660
 ] 

ASF GitHub Bot commented on KAFKA-6796:
---

ijuma closed pull request #4883: KAFKA-6796; Fix surprising UNKNOWN_TOPIC error 
from requests to non-replicas
URL: https://github.com/apache/kafka/pull/4883
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index a0caa4a53c0..7bc9e3ee750 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -287,7 +287,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   for ((topicPartition, partitionData) <- 
offsetCommitRequest.offsetData.asScala) {
 if (!authorize(request.session, Read, new Resource(Topic, 
topicPartition.topic)))
   unauthorizedTopicErrors += (topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED)
-else if (!metadataCache.contains(topicPartition.topic))
+else if (!metadataCache.contains(topicPartition))
   nonExistingTopicErrors += (topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
 else
   authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
@@ -401,7 +401,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 for ((topicPartition, memoryRecords) <- 
produceRequest.partitionRecordsOrFail.asScala) {
   if (!authorize(request.session, Write, new Resource(Topic, 
topicPartition.topic)))
 unauthorizedTopicResponses += topicPartition -> new 
PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
-  else if (!metadataCache.contains(topicPartition.topic))
+  else if (!metadataCache.contains(topicPartition))
 nonExistingTopicResponses += topicPartition -> new 
PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
   else
 authorizedRequestInfo += (topicPartition -> memoryRecords)
@@ -502,13 +502,13 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (fetchRequest.isFromFollower()) {
   // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
   if (authorize(request.session, ClusterAction, Resource.ClusterResource)) 
{
-fetchContext.foreachPartition((part, data) => {
-  if (!metadataCache.contains(part.topic)) {
-erroneous += part -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+fetchContext.foreachPartition((topicPartition, data) => {
+  if (!metadataCache.contains(topicPartition)) {
+erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
   FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
   FetchResponse.INVALID_LOG_START_OFFSET, null, 
MemoryRecords.EMPTY)
   } else {
-interesting += (part -> data)
+interesting += (topicPartition -> data)
   }
 })
   } else {
@@ -520,17 +520,17 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 } else {
   // Regular Kafka consumers need READ permission on each partition they 
are fetching.
-  fetchContext.foreachPartition((part, data) => {
-if (!authorize(request.session, Read, new Resource(Topic, part.topic)))
-  erroneous += part -> new 
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+  fetchContext.foreachPartition((topicPartition, data) => {
+if (!authorize(request.session, Read, new Resource(Topic, 
topicPartition.topic)))
+  erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
 FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
 FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-else if (!metadataCache.contains(part.topic))
-  erroneous += part -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+else if (!metadataCache.contains(topicPartition))
+  erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
 FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
 FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
 else
-  interesting += (part -> data)
+  interesting += (topicPartition -> data)
   })
 }
 
@@ -1062,7 +1062,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 // version 0 reads offsets from ZK
 val 

[jira] [Assigned] (KAFKA-6824) Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener

2018-04-24 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner reassigned KAFKA-6824:
---

Assignee: Rajini Sivaram

> Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener
> --
>
> Key: KAFKA-6824
> URL: https://issues.apache.org/jira/browse/KAFKA-6824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Anna Povzner
>Assignee: Rajini Sivaram
>Priority: Major
>
> Observed two failures of this test (both in PR builds) :(
>  
> *Failure #1: (JDK 7 and Scala 2.11 )*
> *17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
> testAddRemoveSslListener FAILED
> *17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>
> *17:20:49*         at org.junit.Assert.fail(Assert.java:88)
> *17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)
>  
> *Failure #2: (JDK 8)*
> *18:46:23* kafka.server.DynamicBrokerReconfigurationTest > 
> testAddRemoveSslListener FAILED
> *18:46:23*     java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.
> *18:46:23*         at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)
> *18:46:23*         at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)
> *18:46:23*         at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
> *18:46:23*         at 
> kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:953)
> *18:46:23*         at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
> *18:46:23*         at scala.collection.Iterator.foreach(Iterator.scala:929)
> *18:46:23*         at scala.collection.Iterator.foreach$(Iterator.scala:929)
> *18:46:23*         at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
> *18:46:23*         at 
> scala.collection.IterableLike.foreach(IterableLike.scala:71)
> *18:46:23*         at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:70)
> *18:46:23*         at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> *18:46:23*         at 
> scala.collection.TraversableLike.map(TraversableLike.scala:234)
> *18:46:23*         at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:227)
> *18:46:23*         at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104)
> *18:46:23*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:953)
> *18:46:23*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:816)
> *18:46:23*         at 
> kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)
> *18:46:23*
> *18:46:23*         Caused by:
> *18:46:23*         
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
> not the leader for that topic-partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2018-04-24 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451438#comment-16451438
 ] 

Apurva Mehta commented on KAFKA-6817:
-

Hi [~odinodinodin]. Did you try setting the transactional.id.expiration.ms 
setting to 1year and then try to reproduce the problem?

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6376) Improve Streams metrics for skipped records

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451356#comment-16451356
 ] 

ASF GitHub Bot commented on KAFKA-6376:
---

guozhangwang closed pull request #4922: KAFKA-6376: Document skipped records 
metrics changes
URL: https://github.com/apache/kafka/pull/4922
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops.html b/docs/ops.html
index 6ffe97653e6..450a268a2a1 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1353,7 +1353,12 @@ Streams API changes in 1.2.0
+
+We have removed the skippedDueToDeserializationError-rate 
and skippedDueToDeserializationError-total metrics.
+Deserialization errors, and all other causes of record skipping, are 
now accounted for in the pre-existing metrics
+skipped-records-rate and 
skipped-records-total. When a record is skipped, the event is
+now logged at WARN level. If these warnings become burdensome, we 
recommend explicitly filtering out unprocessable
+records instead of depending on record skipping semantics. For more 
details, see
+https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics;>KIP-274.
+As of right now, the potential causes of skipped records are:
+
+
+null keys in table sources
+null keys in table-table inner/left/outer/right 
joins
+null keys or values in stream-table joins
+null keys or values in stream-stream joins
+null keys or values in aggregations on grouped 
streams
+null keys or values in reductions on grouped 
streams
+null keys in aggregations on windowed streams
+null keys in reductions on windowed streams
+null keys in aggregations on session-windowed 
streams
+
+Errors producing results, when the configured 
default.production.exception.handler decides to
+CONTINUE (the default is to FAIL and 
throw an exception).
+
+
+Errors deserializing records, when the configured 
default.deserialization.exception.handler
+decides to CONTINUE (the default is to 
FAIL and throw an exception).
+This was the case previously captured in the 
skippedDueToDeserializationError metrics.
+
+Fetched records having a negative timestamp.
+
+
 
 We have added support for methods in ReadOnlyWindowStore 
which allows for querying a single window's key-value pair.
 For users who have customized window store implementations on the 
above interface, they'd need to update their code to implement the newly added 
method as well.


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.
> KIP: : 

[jira] [Commented] (KAFKA-6819) Refactor build-in StreamsMetrics internal implementations

2018-04-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451257#comment-16451257
 ] 

Guozhang Wang commented on KAFKA-6819:
--

Thanks for your feedbacks John.

For 2.a), the only reason I'm thinking of a static function is that in some 
unit tests, the StreamThread's StreamThreadMetrics may not be initialized while 
some other classes may want to access to its sensors. In this case we may 
indeed want to lazily create the sensors. Your proposal differs with mine in 
the sense that you're suggesting to bookkeep all the ever created sensors in 
SMI, while I was proposing to bookkeep all the ever created sensors at each 
layer in its corresponding XMetrics class. While reviewing your PR 4917 I think 
your proposal maybe better, since then we only need users to access one class: 
StreamsMetrics (and internally SMI) to cleanup any sensors ever created, either 
during normal runs or during unit testing.

> Refactor build-in StreamsMetrics internal implementations
> -
>
> Key: KAFKA-6819
> URL: https://issues.apache.org/jira/browse/KAFKA-6819
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Our current internal implementations of StreamsMetrics and different layered 
> metrics like StreamMetricsThreadImpl, TaskMetrics, NodeMetrics etc are a bit 
> messy nowadays. We could improve on the current situation by doing the 
> following:
> 0. For thread-level metrics, refactor the {{StreamsMetricsThreadImpl}} class 
> to {{ThreadMetrics}} such that a) it does not extend from 
> {{StreamsMetricsImpl}} but just include the {{StreamsMetricsThreadImpl}} as 
> its constructor parameters. And make its constructor, replacing with a static 
> {{addAllSensors(threadName)}} that tries to register all the thread-level 
> sensors for the given thread name.
> 1. Add a static function for each of the built-in sensors of the thread-level 
> metrics in {{ThreadMetrics}} that relies on the internal 
> {{StreamsMetricsConventions}} to get thread level sensor names. If the sensor 
> cannot be found from the internal {{Metrics}} registry, create the sensor 
> on-the-fly.
> 2.a Add a static {{removeAllSensors(threadName)}} function in 
> {{ThreadMetrics}} that tries to de-register all the thread-level metrics for 
> this thread, if there is no sensors then it will be a no-op. In 
> {{StreamThread#close()}} we will trigger this function; and similarly in 
> `TopologyTestDriver` when we close the driver we will also call this function 
> as well. As a result, the {{ThreadMetrics}} class itself would only contain 
> static functions with no member fields at all.
> 2.b We can consider doing the same for {{TaskMetrics}}, {{NodeMetrics}} and 
> {{NamedCacheMetrics}} as well, and add a {{StoreMetrics}} following the 
> similar pattern: although these metrics are not accessed externally to their 
> enclosing class in the future this may be changed as well.
> 3. Then, we only pass {{StreamsMetricsImpl}} around between the internal 
> classes, to access the specific sensor whenever trying to record it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16451258#comment-16451258
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

bbejeck opened a new pull request #4923: KAFKA-6761: Part 1 of 3; Graph nodes
URL: https://github.com/apache/kafka/pull/4923
 
 
   This PR supersedes PR #4654 as it was growing too large. All comments in 
that PR should be addressed here.
I will attempt to break the PRs for the topology optimization effort into 3 
PRs total and will follow this general plan:
   
   1. This PR only adds the graph nodes and graph. The graph nodes will hold 
the information used to make calls to the `InternalTopologyBuilder` when using 
the DSL. Graph nodes are stored in the `StreamsTopologyGraph` until the final 
topology needs building then the graph is traversed and optimizations are made 
at that point.  There are no tests in this PR relying on the follow-up PR to 
use all current streams tests, which should suffice.
   2. PR 2 will intercept all DSL calls and build the graph.  The 
`InternalStreamsBuilder` uses the graph to provide the required info 
`InternalTopologyBuilder` and build a topology.  The condition of satisfaction 
for this PR is that all current unit, integration and system tests pass using 
the graph.
   3. PR 3 adds some optimizations mainly automically repartitioning for 
operations that may modify a key and have child operations that would normally 
create a separate repartition topic, saving possible unnecessary repartiton 
topics.  For example the following topology:
   ```
   KStream mappedStreamOther = inputStream.map(new 
KeyValueMapper() {
   @Override
   public KeyValue apply(String 
key, String value) {
   
   return KeyValue.pair(key.substring(0, 3), value);
   }
   });
   
   
   
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("count-one-out");
   
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(1)).count().toStream().to("count-two-out");
   
mappedStreamOther.groupByKey().windowedBy(TimeWindows.of(15000)).count().toStream().to("count-three-out");
   ```
   would create 3 repartion topics, but after applying an optimization 
strategy, only one is created.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 1.2.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5846) Use singleton NoOpConsumerRebalanceListener in subscribe() call where listener is not specified

2018-04-24 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16191405#comment-16191405
 ] 

Ted Yu edited comment on KAFKA-5846 at 4/24/18 9:13 PM:


Patch looks good to me.


was (Author: yuzhih...@gmail.com):
Patch looks good to me .

> Use singleton NoOpConsumerRebalanceListener in subscribe() call where 
> listener is not specified
> ---
>
> Key: KAFKA-5846
> URL: https://issues.apache.org/jira/browse/KAFKA-5846
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Ted Yu
>Assignee: Kamal Chandraprakash
>Priority: Minor
>
> Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for 
> each subscribe() call where ConsumerRebalanceListener is not specified:
> {code}
> public void subscribe(Pattern pattern) {
> subscribe(pattern, new NoOpConsumerRebalanceListener());
> {code}
> We can create a singleton NoOpConsumerRebalanceListener to be used in such 
> scenarios.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread

2018-04-24 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333251#comment-16333251
 ] 

Ted Yu edited comment on KAFKA-6303 at 4/24/18 9:12 PM:


+1 from me.


was (Author: yuzhih...@gmail.com):
+1

> Potential lack of synchronization in NioEchoServer#AcceptorThread
> -
>
> Key: KAFKA-6303
> URL: https://issues.apache.org/jira/browse/KAFKA-6303
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Ted Yu
>Assignee: siva santhalingam
>Priority: Minor
>
> In the run() method:
> {code}
> SocketChannel socketChannel = 
> ((ServerSocketChannel) key.channel()).accept();
> socketChannel.configureBlocking(false);
> newChannels.add(socketChannel);
> {code}
> Modification to newChannels should be protected by synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-6824) Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener

2018-04-24 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-6824:

Comment: was deleted

(was: Another failure of this test, but now a different reason: (JDK 8)

*18:46:23* kafka.server.DynamicBrokerReconfigurationTest > 
testAddRemoveSslListener FAILED

*18:46:23*     java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

*18:46:23*         at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)

*18:46:23*         at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)

*18:46:23*         at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)

*18:46:23*         at 
kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:953)

*18:46:23*         at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)

*18:46:23*         at scala.collection.Iterator.foreach(Iterator.scala:929)

*18:46:23*         at scala.collection.Iterator.foreach$(Iterator.scala:929)

*18:46:23*         at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)

*18:46:23*         at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)

*18:46:23*         at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)

*18:46:23*         at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)

*18:46:23*         at 
scala.collection.TraversableLike.map(TraversableLike.scala:234)

*18:46:23*         at 
scala.collection.TraversableLike.map$(TraversableLike.scala:227)

*18:46:23*         at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

*18:46:23*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:953)

*18:46:23*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:816)

*18:46:23*         at 
kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)

*18:46:23* 

*18:46:23*         Caused by:

*18:46:23*         
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.)

> Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener
> --
>
> Key: KAFKA-6824
> URL: https://issues.apache.org/jira/browse/KAFKA-6824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Anna Povzner
>Priority: Major
>
> Observed two failures of this test (both in PR builds) :(
> *Failure #1: (JDK 7 and Scala 2.11 )*
> *17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
> testAddRemoveSslListener FAILED
> *17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>
> *17:20:49*         at org.junit.Assert.fail(Assert.java:88)
> *17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)
>  
> *Failure #2: (JDK 8)*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6824) Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener

2018-04-24 Thread Anna Povzner (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anna Povzner updated KAFKA-6824:

Description: 
Observed two failures of this test (both in PR builds) :(

*Failure #1: (JDK 7 and Scala 2.11 )*

*17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
testAddRemoveSslListener FAILED

*17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>

*17:20:49*         at org.junit.Assert.fail(Assert.java:88)

*17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)

*17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)

*17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)

 

*Failure #2: (JDK 8)*

  was:
Saw in my PR build (*JDK 7 and Scala 2.11* ):

*17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
testAddRemoveSslListener FAILED

*17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>

*17:20:49*         at org.junit.Assert.fail(Assert.java:88)

*17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)

*17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)

*17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)

*17:20:50*


> Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener
> --
>
> Key: KAFKA-6824
> URL: https://issues.apache.org/jira/browse/KAFKA-6824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Anna Povzner
>Priority: Major
>
> Observed two failures of this test (both in PR builds) :(
> *Failure #1: (JDK 7 and Scala 2.11 )*
> *17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
> testAddRemoveSslListener FAILED
> *17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>
> *17:20:49*         at org.junit.Assert.fail(Assert.java:88)
> *17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)
>  
> *Failure #2: (JDK 8)*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6820) Improve on StreamsMetrics Public APIs

2018-04-24 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450473#comment-16450473
 ] 

John Roesler commented on KAFKA-6820:
-

I'm +1 on separating the built-in metrics from the methods we expose for public 
use.

 

Right now, I'm of the opinion it would be nicer to just expose methods for 
getting a sensor at the Thread and at the ProcessorNode level. The process of 
adding metrics to a sensor is really not that bad, and we can provide 
documentation to help with it.

This would allow users to measure exactly what they need to, without having to 
create new feature requests.

Plus, it seems like they'd currently have to dig in to understand what the 
latencyAndThroughputSensor is versus a throughPutSensor, just to decide which 
one they need. This process includes learning what they would have needed to 
learn to just add their own metrics, so I argue these convenience methods are 
strictly more complicated to understand than raw sensors/metrics.

I think it would be even simpler if we just handed them the Metrics object, but 
that would mean that users would have to manage unloading their sensors inside 
Processor#close(), but this is only straightforward for ProcessorNode-level 
metrics. To correctly unload higher-level metrics, they'd have to track the 
number of children and unload the metric only when it has no more children. 
Plus, it's too easy to introduce contention by creating a top-level metric 
(it's both not obvious that this will introduce contention, and it's easy to do 
it even if you didn't mean to).

For this reason, I think we should track user-created sensors at each level up 
to the thread, which means we have to control their creation and not give a 
direct reference to the Metrics registry.

> Improve on StreamsMetrics Public APIs
> -
>
> Key: KAFKA-6820
> URL: https://issues.apache.org/jira/browse/KAFKA-6820
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Our current `addLatencyAndThroughputSensor`, `addThroughputSensor` are not 
> very well designed and hence not very user friendly to people to add their 
> customized sensors. We could consider improving on this feature. Some related 
> things to consider:
> 1. Our internal built-in metrics should be independent on these public APIs 
> which are for user customized sensor only. See KAFKA-6819 for related 
> description.
> 2. We could enforce the scopeName possible values, and well document on the 
> sensor hierarchies that would be incurred from the function calls. In this 
> way the library can help closing user's sensors automatically when the 
> corresponding scope (store, task, thread, etc) is being de-constructed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6824) Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener

2018-04-24 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450453#comment-16450453
 ] 

Anna Povzner commented on KAFKA-6824:
-

Another failure of this test, but now a different reason: (JDK 8)

*18:46:23* kafka.server.DynamicBrokerReconfigurationTest > 
testAddRemoveSslListener FAILED

*18:46:23*     java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

*18:46:23*         at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94)

*18:46:23*         at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77)

*18:46:23*         at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)

*18:46:23*         at 
kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:953)

*18:46:23*         at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)

*18:46:23*         at scala.collection.Iterator.foreach(Iterator.scala:929)

*18:46:23*         at scala.collection.Iterator.foreach$(Iterator.scala:929)

*18:46:23*         at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)

*18:46:23*         at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)

*18:46:23*         at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)

*18:46:23*         at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)

*18:46:23*         at 
scala.collection.TraversableLike.map(TraversableLike.scala:234)

*18:46:23*         at 
scala.collection.TraversableLike.map$(TraversableLike.scala:227)

*18:46:23*         at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

*18:46:23*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:953)

*18:46:23*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:816)

*18:46:23*         at 
kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)

*18:46:23* 

*18:46:23*         Caused by:

*18:46:23*         
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition.

> Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener
> --
>
> Key: KAFKA-6824
> URL: https://issues.apache.org/jira/browse/KAFKA-6824
> Project: Kafka
>  Issue Type: Bug
>Reporter: Anna Povzner
>Priority: Major
>
> Saw in my PR build (*JDK 7 and Scala 2.11* ):
> *17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
> testAddRemoveSslListener FAILED
> *17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>
> *17:20:49*         at org.junit.Assert.fail(Assert.java:88)
> *17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)
> *17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)
> *17:20:49*         at 
> kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)
> *17:20:50*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6376) Improve Streams metrics for skipped records

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450421#comment-16450421
 ] 

ASF GitHub Bot commented on KAFKA-6376:
---

vvcephei opened a new pull request #4922: KAFKA-6376: Document skipped records 
metrics changes
URL: https://github.com/apache/kafka/pull/4922
 
 
   Document the metrics changes in 
https://github.com/apache/kafka/commit/ed51b2cdf5bdac210a6904bead1a2ca6e8411406 
.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: kip
> Fix For: 1.2.0
>
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.
> KIP: : 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6705) producer.send() should not block due to metadata not available

2018-04-24 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450350#comment-16450350
 ] 

Dong Lin commented on KAFKA-6705:
-

I am going to close this ticket. Thinking about this more, the benefit of not 
having to wait for metadata does not seem to worth the complexity added in 
producer due to this ticket. Assuming that the Kafka cluster is available, 
which should be the case, it should be fast to wait for the first metadata. 
After the first metadata most likely the producer will not have to wait for 
metadata to send message.
 

> producer.send() should not block due to metadata not available
> --
>
> Key: KAFKA-6705
> URL: https://issues.apache.org/jira/browse/KAFKA-6705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently producer.send() may block on metadata for up to max.block.ms. This 
> behavior is well documented but it is a bit sub-optimal. Since we return a 
> future we should be able to make producer.send() completely non-blocking. One 
> idea is to simply insert the record into a global queue shared across all 
> partitions, and let the sender thread fetch record from this queue and send 
> to broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6705) producer.send() should not block due to metadata not available

2018-04-24 Thread Dong Lin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-6705.
-
Resolution: Won't Fix

> producer.send() should not block due to metadata not available
> --
>
> Key: KAFKA-6705
> URL: https://issues.apache.org/jira/browse/KAFKA-6705
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently producer.send() may block on metadata for up to max.block.ms. This 
> behavior is well documented but it is a bit sub-optimal. Since we return a 
> future we should be able to make producer.send() completely non-blocking. One 
> idea is to simply insert the record into a global queue shared across all 
> partitions, and let the sender thread fetch record from this queue and send 
> to broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6677) Remove EOS producer config max.in.flight.request.per.connection=1

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450337#comment-16450337
 ] 

ASF GitHub Bot commented on KAFKA-6677:
---

mjsax closed pull request #4868: KAFKA-6677: Fixed streamconfig producer's 
maxinflight allowed when EOS Enabled.
URL: https://github.com/apache/kafka/pull/4868
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 65b1da6dede..e46d6d086ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
@@ -119,7 +120,7 @@
  * 
  *   {@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} 
(read_committed) - Consumers will always read committed data only
  *   {@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} 
(true) - Producer will always have idempotency enabled
- *   {@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
"max.in.flight.requests.per.connection"} (1) - Producer will always have one 
in-flight request per connection
+ *   {@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 
"max.in.flight.requests.per.connection"} (5) - Producer will always have one 
in-flight request per connection
  * 
  *
  *
@@ -650,7 +651,6 @@
 final Map tempProducerDefaultOverrides = new 
HashMap<>(PRODUCER_DEFAULT_OVERRIDES);
 tempProducerDefaultOverrides.put(ProducerConfig.RETRIES_CONFIG, 
Integer.MAX_VALUE);
 
tempProducerDefaultOverrides.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
true);
-
tempProducerDefaultOverrides.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
 1);
 
 PRODUCER_EOS_OVERRIDES = 
Collections.unmodifiableMap(tempProducerDefaultOverrides);
 }
@@ -785,6 +785,10 @@ private void 
checkIfUnexpectedUserSpecifiedConsumerConfig(final Map producerConfigs = 
streamsConfig.getProducerConfigs("clientId");
-assertThat((Integer) 
producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), 
equalTo(1));
-}
-
-@Test
-public void 
shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled() {
-props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2);
-final StreamsConfig streamsConfig = new StreamsConfig(props);
-final Map 

[jira] [Comment Edited] (KAFKA-6142) Connect worker configurations and connector configurations should accept environment variables

2018-04-24 Thread Robert Yokota (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450299#comment-16450299
 ] 

Robert Yokota edited comment on KAFKA-6142 at 4/24/18 5:59 PM:
---

We might also want variables to be able to refer to properties in the same file:

{{consumer.ssl.truststore.location=$\{producer.ssl.truststore.location}}}

so a different syntax might be 

{code}${ env:env_var_name, sys:sys_prop_name; defaultValue }{code}
 

Note the use of semicolon to delimit the defaultValue.  The use of env: and 
sys: is the same as in Apache Commons Configuration.


was (Author: rayokota):
We might also want variables to be able to refer to properties in the same file:

{{consumer.ssl.truststore.location=${producer.ssl.truststore.location}}}

so a different syntax might be 

{{${ env:env_var_name, sys:sys_prop_name; defaultValue }}}

Note the use of semicolon to delimit the defaultValue.  The use of env: and 
sys: is the same as in Apache Commons Configuration.

> Connect worker configurations and connector configurations should accept 
> environment variables
> --
>
> Key: KAFKA-6142
> URL: https://issues.apache.org/jira/browse/KAFKA-6142
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> Currently, when a worker or connector configuration is parsed, the values are 
> used as-is without any kind of pre-processing before the value is used. It 
> should be possible to define configuration properties such that string 
> literal values or default values can use *_configuration variables_* that 
> reference environment variables and/or system properties, and that these 
> configuration variables are resolved/replaced before the configuration value 
> is used.
> I propose doing this enhancement in Kafka client's {{ConfigDef}} by adding a 
> {{ConfigDef.Transformer}} interface:
> {code:java}
> /**
>  * Transform the configuration value.
>  */
> public interface Transformer {
> /**
>  * Transform the configuration value.
>  * @param name The name of the configuration
>  * @param value The value of the configuration
>  * @return the preprocessed value
>  * @throws ConfigException if the value is invalid.
>  */
> Object apply(String name, Object value);
> }
> {code}
> and then allowing {{Transformer}} implementations to be passed to 
> {{ConfigDef.define(...)}} such all existing signatures are maintained for 
> backward compatibility. By default, the definition would use an identity 
> transform that simply returns the value. The transformers would be called in 
> {{ConfigDef.parseValue(...)}} before the {{parseType(...)}} method is called, 
> and would also be called on the default value if one is provided.
> Then, a {{ConfigDef.ReplaceSystemVariables}} implementation would be provided 
> to look in {{String}} values for zero or more variables defined with this 
> EBNF grammar:
> {noformat}
> '$' '{' varName { ',' varName } [ ':' [ defaultValue] ] '}'
> varName ::=  varChar+
> defaultValue ::= varChar*
> varChar ::= any character excluding ',', ':', '}', and '\' except when escaped
> {noformat}
> where:
> * {{varName}} is the name of a Java system property or {{env.}} followed by 
> the name of an environment variable, and 
> * {{defaultValue}} specifies the replacement value used when no environment 
> variable or system property is found, and defaults to an empty string if the 
> `:` is included in the variable
> The value of the first system property or environment variable resolved is 
> then used to replace the variable expression. This implementation would have 
> trace or debug level logging to describe what it is doing.
> Here are several examples:
> ||Variable||Variable replaced with||
> |$\{env.KAFKA_HOME\}| the value of the {{KAFKA_HOME}} environment variable or 
> a null value if that variable doesn't exist |
> |$\{foo.prop1,foo.prop2,env.MY_ENV_VAR:value\}| the value of the 
> {{foo.prop1}} system property if it exists, or with the value of the 
> {{foo.prop2}} system property if it exists, or with the value of the 
> {{MY_ENV_VAR}} environment variable if it exists, or {{value}} if none of the 
> system properties exist |
> |$\{foo.prop1,foo.prop2,env.MY_ENV_VAR:\} | the value of the {{foo.prop1}} 
> system property if it exists, or with the value of the {{foo.prop2}} system 
> property if it exists, or with the value of the {{MY_ENV_VAR}} environment 
> variable if it exists, or an empty string if none of the system properties 
> exist |
> |$\{foo.prop1,foo.prop2,env.MY_ENV_VAR\} | the value of the {{foo.prop1}} 

[jira] [Created] (KAFKA-6824) Transient failure in DynamicBrokerReconfigurationTest.testAddRemoveSslListener

2018-04-24 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6824:
---

 Summary: Transient failure in 
DynamicBrokerReconfigurationTest.testAddRemoveSslListener
 Key: KAFKA-6824
 URL: https://issues.apache.org/jira/browse/KAFKA-6824
 Project: Kafka
  Issue Type: Bug
Reporter: Anna Povzner


Saw in my PR build (*JDK 7 and Scala 2.11* ):

*17:20:49* kafka.server.DynamicBrokerReconfigurationTest > 
testAddRemoveSslListener FAILED

*17:20:49*     java.lang.AssertionError: expected:<10> but was:<12>

*17:20:49*         at org.junit.Assert.fail(Assert.java:88)

*17:20:49*         at org.junit.Assert.failNotEquals(Assert.java:834)

*17:20:49*         at org.junit.Assert.assertEquals(Assert.java:645)

*17:20:49*         at org.junit.Assert.assertEquals(Assert.java:631)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784)

*17:20:49*         at 
kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705)

*17:20:50*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6142) Connect worker configurations and connector configurations should accept environment variables

2018-04-24 Thread Robert Yokota (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450299#comment-16450299
 ] 

Robert Yokota commented on KAFKA-6142:
--

We might also want variables to be able to refer to properties in the same file:

{{consumer.ssl.truststore.location=${producer.ssl.truststore.location}}}

so a different syntax might be 

{{${ env:env_var_name, sys:sys_prop_name; defaultValue }}}

Note the use of semicolon to delimit the defaultValue.  The use of env: and 
sys: is the same as in Apache Commons Configuration.

> Connect worker configurations and connector configurations should accept 
> environment variables
> --
>
> Key: KAFKA-6142
> URL: https://issues.apache.org/jira/browse/KAFKA-6142
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> Currently, when a worker or connector configuration is parsed, the values are 
> used as-is without any kind of pre-processing before the value is used. It 
> should be possible to define configuration properties such that string 
> literal values or default values can use *_configuration variables_* that 
> reference environment variables and/or system properties, and that these 
> configuration variables are resolved/replaced before the configuration value 
> is used.
> I propose doing this enhancement in Kafka client's {{ConfigDef}} by adding a 
> {{ConfigDef.Transformer}} interface:
> {code:java}
> /**
>  * Transform the configuration value.
>  */
> public interface Transformer {
> /**
>  * Transform the configuration value.
>  * @param name The name of the configuration
>  * @param value The value of the configuration
>  * @return the preprocessed value
>  * @throws ConfigException if the value is invalid.
>  */
> Object apply(String name, Object value);
> }
> {code}
> and then allowing {{Transformer}} implementations to be passed to 
> {{ConfigDef.define(...)}} such all existing signatures are maintained for 
> backward compatibility. By default, the definition would use an identity 
> transform that simply returns the value. The transformers would be called in 
> {{ConfigDef.parseValue(...)}} before the {{parseType(...)}} method is called, 
> and would also be called on the default value if one is provided.
> Then, a {{ConfigDef.ReplaceSystemVariables}} implementation would be provided 
> to look in {{String}} values for zero or more variables defined with this 
> EBNF grammar:
> {noformat}
> '$' '{' varName { ',' varName } [ ':' [ defaultValue] ] '}'
> varName ::=  varChar+
> defaultValue ::= varChar*
> varChar ::= any character excluding ',', ':', '}', and '\' except when escaped
> {noformat}
> where:
> * {{varName}} is the name of a Java system property or {{env.}} followed by 
> the name of an environment variable, and 
> * {{defaultValue}} specifies the replacement value used when no environment 
> variable or system property is found, and defaults to an empty string if the 
> `:` is included in the variable
> The value of the first system property or environment variable resolved is 
> then used to replace the variable expression. This implementation would have 
> trace or debug level logging to describe what it is doing.
> Here are several examples:
> ||Variable||Variable replaced with||
> |$\{env.KAFKA_HOME\}| the value of the {{KAFKA_HOME}} environment variable or 
> a null value if that variable doesn't exist |
> |$\{foo.prop1,foo.prop2,env.MY_ENV_VAR:value\}| the value of the 
> {{foo.prop1}} system property if it exists, or with the value of the 
> {{foo.prop2}} system property if it exists, or with the value of the 
> {{MY_ENV_VAR}} environment variable if it exists, or {{value}} if none of the 
> system properties exist |
> |$\{foo.prop1,foo.prop2,env.MY_ENV_VAR:\} | the value of the {{foo.prop1}} 
> system property if it exists, or with the value of the {{foo.prop2}} system 
> property if it exists, or with the value of the {{MY_ENV_VAR}} environment 
> variable if it exists, or an empty string if none of the system properties 
> exist |
> |$\{foo.prop1,foo.prop2,env.MY_ENV_VAR\} | the value of the {{foo.prop1}} 
> system property if it exists, or with the value of the {{foo.prop2}} system 
> property if it exists, or with the value of the {{MY_ENV_VAR}} environment 
> variable if it exists, or a null value if none of the system properties exist 
> |



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2018-04-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450300#comment-16450300
 ] 

Matthias J. Sax commented on KAFKA-5998:


Actual processing should not be affected. Only in case of failure, restore 
might take longer as the checkpoint files encode what data is in the store -- 
without the checkpoint file, it's required to re-read the whole changelog topic 
to guaranteed correct restore.

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> 09:43:25.974 [ks_0_inst-StreamThread-15] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_0/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  

[jira] [Created] (KAFKA-6823) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize

2018-04-24 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6823:
---

 Summary: Transient failure in 
DynamicBrokerReconfigurationTest.testThreadPoolResize
 Key: KAFKA-6823
 URL: https://issues.apache.org/jira/browse/KAFKA-6823
 Project: Kafka
  Issue Type: Bug
Reporter: Anna Povzner


Saw in my PR build (*DK 10 and Scala 2.12 ):*

*15:58:46* kafka.server.DynamicBrokerReconfigurationTest > testThreadPoolResize 
FAILED

*15:58:46*     java.lang.AssertionError: Invalid threads: expected 6, got 7: 
List(ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-1, 
ReplicaFetcherThread-0-2, ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-2, 
ReplicaFetcherThread-1-0, ReplicaFetcherThread-0-1)

*15:58:46*         at org.junit.Assert.fail(Assert.java:88)

*15:58:46*         at org.junit.Assert.assertTrue(Assert.java:41)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1147)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.maybeVerifyThreadPoolSize$1(DynamicBrokerReconfigurationTest.scala:412)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:431)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.reducePoolSize$1(DynamicBrokerReconfigurationTest.scala:417)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:440)

*15:58:46*         at 
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:156)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:439)

*15:58:46*         at 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:453)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450117#comment-16450117
 ] 

ASF GitHub Bot commented on KAFKA-6526:
---

rajinisivaram opened a new pull request #4920: KAFKA-6526: Enable unclean 
leader election without controller change
URL: https://github.com/apache/kafka/pull/4920
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update controller to handle changes to unclean.leader.election.enable
> -
>
> Key: KAFKA-6526
> URL: https://issues.apache.org/jira/browse/KAFKA-6526
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 1.2.0
>
>
> At the moment, updates to default unclean.leader.election.enable uses the 
> same code path as updates to topic overrides. This requires controller change 
> for the new value to take effect. It will be good if we can update the 
> controller to handle the change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6786) Remove additional configs in StreamsStandbyTask

2018-04-24 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16450057#comment-16450057
 ] 

Matthias J. Sax commented on KAFKA-6786:


Thanks for your interest [~abhi_sh]! I assigned the Jira to you. Also added you 
to the list on contributors. Feel free to assign unassigned Jiras to yourself 
if you want to pick them up.

> Remove additional configs in StreamsStandbyTask
> ---
>
> Key: KAFKA-6786
> URL: https://issues.apache.org/jira/browse/KAFKA-6786
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Abhishek Sharma
>Priority: Minor
>  Labels: newbie
>
> Since we are now passing in a property file into the streams service 
> initialization code, we do not need to pass in those configs as additional 
> properties in StreamsStandbyTask. We can refactor this test to get rid of the 
> additional properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6786) Remove additional configs in StreamsStandbyTask

2018-04-24 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6786?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6786:
--

Assignee: Abhishek Sharma

> Remove additional configs in StreamsStandbyTask
> ---
>
> Key: KAFKA-6786
> URL: https://issues.apache.org/jira/browse/KAFKA-6786
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Abhishek Sharma
>Priority: Minor
>  Labels: newbie
>
> Since we are now passing in a property file into the streams service 
> initialization code, we do not need to pass in those configs as additional 
> properties in StreamsStandbyTask. We can refactor this test to get rid of the 
> additional properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
start of microservice.
 Our configuration is Kafka 1.0.0 but microservices are built with Kafka 
Streams 0.10.2.1.
 We had to reset the stream offsets in order unblock microservices 'cause 
restarts didn't help.

We faced the problem only once and didn't have a chance to reproduce it, so 
we're sorry in advance for maybe poor explanations.

Below are details that we've managed to collect that time:

Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause this condition never happened:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.

  was:
We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
start of microservice.
 Our configuration is Kafka 1.0.0 but microservices are built with Kafka 
Streams 0.10.2.1.
 We had to reset the stream offsets in order unblock microservices 'cause 
restarts didn't help.

We faced the problem only once and didn't had a chence to reproduce it, so 
we're sorry in advance for maybe poor explanations.

Below are details that we've managed to collect that time:

Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause this condition never happened:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
> start of microservice.
>  Our configuration is Kafka 1.0.0 but microservices are built with Kafka 
> Streams 0.10.2.1.
>  We had to reset the stream offsets in order unblock microservices 'cause 
> restarts didn't help.
> We faced the problem only once and didn't have a chance to reproduce it, so 
> we're sorry in advance for maybe poor explanations.
> Below are details that we've managed to collect that time:
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> That said, we had a situation when StateStore (0.10.2.1) stuck in loading 
> data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} 
> which kept spinning in consumer loop 'cause this condition never happened:
> {code:java}
>    } else if 

[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
start of microservice.
 Our configuration is Kafka 1.0.0 but microservices are built with Kafka 
Streams 0.10.2.1.
 We had to reset the stream offsets in order unblock microservices 'cause 
restarts didn't help.

We faced the problem only once and didn't had a chence to reproduce it, so 
we're sorry in advance for maybe poor explanations.

Below are details that we've managed to collect that time:

Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause this condition never happened:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.

  was:
We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
start of microservice.
Our configuration is Kafka 1.0.0 but microservices are built with Kafka Streams 
0.10.2.1.
We have to reset the stream offsets in order unblock microservices 'cause 
restarts didn't help.

We faced the problem only once and didn't had a chence to reproduce it, so 
we're sorry in advance for maybe poor explanations.

Below are details that we've managed to collect that time:

Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause this condition never happened:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
> start of microservice.
>  Our configuration is Kafka 1.0.0 but microservices are built with Kafka 
> Streams 0.10.2.1.
>  We had to reset the stream offsets in order unblock microservices 'cause 
> restarts didn't help.
> We faced the problem only once and didn't had a chence to reproduce it, so 
> we're sorry in advance for maybe poor explanations.
> Below are details that we've managed to collect that time:
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> That said, we had a situation when StateStore (0.10.2.1) stuck in loading 
> data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} 
> which kept spinning in consumer loop 'cause this condition never happened:
> {code:java}
>    } else if 

[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
start of microservice.
Our configuration is Kafka 1.0.0 but microservices are built with Kafka Streams 
0.10.2.1.
We have to reset the stream offsets in order unblock microservices 'cause 
restarts didn't help.

We faced the problem only once and didn't had a chence to reproduce it, so 
we're sorry in advance for maybe poor explanations.

Below are details that we've managed to collect that time:

Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause this condition never happened:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happened:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
> start of microservice.
> Our configuration is Kafka 1.0.0 but microservices are built with Kafka 
> Streams 0.10.2.1.
> We have to reset the stream offsets in order unblock microservices 'cause 
> restarts didn't help.
> We faced the problem only once and didn't had a chence to reproduce it, so 
> we're sorry in advance for maybe poor explanations.
> Below are details that we've managed to collect that time:
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> That said, we had a situation when StateStore (0.10.2.1) stuck in loading 
> data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} 
> which kept spinning in consumer loop 'cause this condition never happened:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction. 
>  Or there is inconsistency between offsets calculation between 0.10.2.1 and 
> 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5677) Remove deprecated punctuate method

2018-04-24 Thread Jimin Hsieh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5677?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jimin Hsieh reassigned KAFKA-5677:
--

Assignee: Jimin Hsieh

> Remove deprecated punctuate method
> --
>
> Key: KAFKA-5677
> URL: https://issues.apache.org/jira/browse/KAFKA-5677
> Project: Kafka
>  Issue Type: Task
>Reporter: Michal Borowiecki
>Assignee: Jimin Hsieh
>Priority: Major
>
> Task to track the removal of the punctuate method that got deprecated in 
> KAFKA-5233 and associated unit tests.
> (not sure the fix version number at this point)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
Or there is inconsistency between offsets calculation between .


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> We had a situation when StateStore (0.10.2.1) stuck in loading data. The 
> reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
> spinning in consumer loop 'cause this condition never happens:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction. 
>  Or there is inconsistency between offsets calculation between 0.10.2.1 and 
> 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
Or there is inconsistency between offsets calculation between .

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction.


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> We had a situation when StateStore (0.10.2.1) stuck in loading data. The 
> reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
> spinning in consumer loop 'cause this condition never happens:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction. 
> Or there is inconsistency between offsets calculation between .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction.

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction.

 


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> We had a situation when StateStore (0.10.2.1) stuck in loading data. The 
> reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
> spinning in consumer loop 'cause this condition never happens:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction.

 

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> We had a situation when StateStore (0.10.2.1) stuck in loading data. The 
> reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
> spinning in consumer loop 'cause this condition never happens:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

We had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

If had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> We had a situation when StateStore (0.10.2.1) stuck in loading data. The 
> reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
> spinning in consumer loop 'cause this condition never happens:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

If had a situation when StateStore (0.10.2.1) stuck in loading data. The reason 
was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in 
consumer loop 'cause this condition never happens:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

If had a situation when StateStore ()

This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps 
spinning in consumer loop 'cause this condition will never happen:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> If had a situation when StateStore (0.10.2.1) stuck in loading data. The 
> reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
> spinning in consumer loop 'cause this condition never happens:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

If had a situation when StateStore ()

This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps 
spinning in consumer loop 'cause this condition will never happen:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps 
spinning in consumer loop 'cause this condition will never happen:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
>  
> If had a situation when StateStore ()
> This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps 
> spinning in consumer loop 'cause this condition will never happen:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)
Phil Mikhailov created KAFKA-6822:
-

 Summary: Kafka Consumer 0.10.2.1 can not normally read data from 
Kafka 1.0.0
 Key: KAFKA-6822
 URL: https://issues.apache.org/jira/browse/KAFKA-6822
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Phil Mikhailov


Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps 
spinning in consumer loop 'cause this condition will never happen:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

 

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6822) Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0

2018-04-24 Thread Phil Mikhailov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
--
Description: 
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps 
spinning in consumer loop 'cause this condition will never happen:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

  was:
Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps 
spinning in consumer loop 'cause this condition will never happen:
{code:java}
   } else if (restoreConsumer.position(storePartition) == endOffset) {
   break;
   }
{code}
 

 

 

 

 

 


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> ---
>
> Key: KAFKA-6822
> URL: https://issues.apache.org/jira/browse/KAFKA-6822
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Phil Mikhailov
>Priority: Major
>
> Kafka consumer 0.10.2.1 calculates offsets like this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
> This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps 
> spinning in consumer loop 'cause this condition will never happen:
> {code:java}
>    } else if (restoreConsumer.position(storePartition) == endOffset) {
>    break;
>    }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6821) The producer attempted to use a producer id which is not currently assigned to its transactional id

2018-04-24 Thread RandySun (JIRA)
RandySun created KAFKA-6821:
---

 Summary: The producer attempted to use a producer id which is not 
currently assigned to its transactional id 
 Key: KAFKA-6821
 URL: https://issues.apache.org/jira/browse/KAFKA-6821
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
 Environment: Centos 7
Reporter: RandySun


I use Kafka Stream to join to KStream,however, I found an error stack trace in 
my application log as below:
{code:java}
Aborting producer batches due to fatal error 
org.apache.kafka.common.KafkaException: 
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id 
at 
org.apache.kafka.clients.producer.internals.TransactionManager$AddPartitionsToTxnHandler.handleResponse(TransactionManager.java:1037)
 
at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:905)
 
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id 
{code}
 

There is no evidence showing any other error in my application, and the kafka 
controller.log saying

 

 
{code:java}
[2018-04-17 17:54:33,212] DEBUG [Controller id=2] Preferred replicas by broker 
Map(2 -> Map(__consumer_offsets-19 -> Vector(2, 3, 1), 
com.randy.Demo1-KSTREAM-FLATMAP-01-repartition-1 -> Vector(2)

[2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
replica Map() (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
broker 2 is 0.0 (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
replica Map() (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
broker 1 is 0.0 (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] DEBUG [Controller id=2] Topics not in preferred 
replica Map() (kafka.controller.KafkaController)
[2018-04-17 17:54:33,212] TRACE [Controller id=2] Leader imbalance ratio for 
broker 3 is 0.0 (kafka.controller.KafkaController)
{code}
 

 

 

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread

2018-04-24 Thread Dong Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449370#comment-16449370
 ] 

Dong Lin commented on KAFKA-6795:
-

Thanks [~apovzner]! Nice catch. You are right that the right behavior is to 
truncate futureReplica based on future replica's epochCache and latestEpoch. I 
will review the patch tomorrow.

 

> Add unit test for ReplicaAlterLogDirsThread
> ---
>
> Key: KAFKA-6795
> URL: https://issues.apache.org/jira/browse/KAFKA-6795
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Major
>
> ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but 
> there is no unit test. 
> [~lindong] I assigned this to myself, since ideally I wanted to add unit 
> tests for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6813) Remove deprecated APIs from KIP-120 and KIP-182 in Streams

2018-04-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449366#comment-16449366
 ] 

ASF GitHub Bot commented on KAFKA-6813:
---

guozhangwang opened a new pull request #4919: [WIP] KAFKA-6813: Remove 
deprecated APIs in KIP-120 and KIP-182
URL: https://github.com/apache/kafka/pull/4919
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove deprecated APIs from KIP-120 and KIP-182 in Streams
> --
>
> Key: KAFKA-6813
> URL: https://issues.apache.org/jira/browse/KAFKA-6813
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.0.0
>
>
> As we move on to the next major release 2.0, we can consider removing the 
> deprecated APIs from KIP-120 and KIP-182.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6795) Add unit test for ReplicaAlterLogDirsThread

2018-04-24 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16449275#comment-16449275
 ] 

Anna Povzner edited comment on KAFKA-6795 at 4/24/18 6:03 AM:
--

[~lindong] Before reviewing unit tests, could you please confirm the behavior 
of ReplicaAlterLogDirsThread.buildLeaderEpochRequest()? I think, the right 
behavior is to build offset for leader epoch requests based on *future* 
replica's epochCache and latestEpoch(). Since *future* replica is the fetching 
replica, so it should request an offset based on its own latestEpoch() vs. 
using latestEpoch() of topic partition replica. And then 
fetchEpochsFromLeader() correctly fetches endOffsetFor() based on latest epoch 
of *future* replica. Otherwise, it is basically fetches the offset based on 
"leader" epoch.

The PR includes the proposed fix.


was (Author: apovzner):
[~lindong] Before reviewing unit tests, could you please confirm the behavior 
of ReplicaAlterLogDirsThread.buildLeaderEpochRequest()? I think, the right 
behavior is to build offset for leader epoch requests based on *future* 
replica's epochCache and latestEpoch(). Since *future* replica is the fetching 
replica, so it should request an offset based on its own latestEpoch() vs. 
using latestEpoch() of topic partition replica. And then 
fetchEpochsFromLeader() correctly fetches endOffsetFor() based on latest epoch 
of *future* replica. Otherwise, it is basically fetches the offset based on 
"leader" epoch.

It was actually hard to notice, only after I realized that I was mocking 
latestEpoch() of the source replica vs. destination. The PR currently has tests 
which follow the current code (meaning they pass), but if you agree regarding 
the implementation of ReplicaAlterLogDirsThread.buildLeaderEpochRequest(), then 
I will need to fix the tests as well.

I think the right implementation is to replace: 
{{private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = 
replicaMgr.getReplica(tp).map(_.epochs.get)}}
with
{{private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = 
replicaMgr.getReplica(tp, Request.FutureLocalReplicaId).map(_.epochs.get)}}

 

> Add unit test for ReplicaAlterLogDirsThread
> ---
>
> Key: KAFKA-6795
> URL: https://issues.apache.org/jira/browse/KAFKA-6795
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Anna Povzner
>Assignee: Anna Povzner
>Priority: Major
>
> ReplicaAlterLogDirsThread was added as part of KIP-113 implementation, but 
> there is no unit test. 
> [~lindong] I assigned this to myself, since ideally I wanted to add unit 
> tests for KAFKA-6361 related changes (KIP-279), but feel free to re-assign. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)