[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2018-07-26 Thread Andy Bryant (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559225#comment-16559225
 ] 

Andy Bryant commented on KAFKA-2260:


 Would prove very handy in event source based designs

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Ben Kirwin
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7207) Make rate & total metrics documentation consistent

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559105#comment-16559105
 ] 

ASF GitHub Bot commented on KAFKA-7207:
---

KevinLiLu opened a new pull request #5429: KAFKA-7207: Make -rate & -total 
metrics documentation consistent
URL: https://github.com/apache/kafka/pull/5429
 
 
   https://issues.apache.org/jira/browse/KAFKA-7207
   
   Some sections of the `Monitoring` metrics documentation list out the 
`-total` metrics, and some sections do not list them out. We should make them 
consistent and list out the missing `-total` metrics.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make rate & total metrics documentation consistent
> --
>
> Key: KAFKA-7207
> URL: https://issues.apache.org/jira/browse/KAFKA-7207
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Trivial
>
> [KIP-187|https://cwiki-test.apache.org/confluence/display/KAFKA/KIP-187+-+Add+cumulative+count+metric+for+all+Kafka+rate+metrics]
>  added an additional cumulative total metric for each existing rate metric.
> In the [Monitoring|https://kafka.apache.org/documentation/#monitoring] 
> section of the docs, there is a mix of some sections only listing out the 
> rate metric, and some sections listing out both the total metric and the rate 
> metric. 
> We should make all the sections consistent and list out all of the total 
> metrics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-26 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-7192:
-
Fix Version/s: 2.1.0

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Priority: Critical
>  Labels: bugs
> Fix For: 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-26 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7192.
--
Resolution: Fixed

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559096#comment-16559096
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang closed pull request #5421: KAFKA-7192: Wipe out if EOS is turned 
on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5421
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 188ff473038..94e4c71d9c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -137,6 +137,10 @@ public String toString() {
 return toString("");
 }
 
+public boolean isEosEnabled() {
+return eosEnabled;
+}
+
 /**
  * Produces a string representation containing useful information about a 
Task starting with the given indent.
  * This is useful in debugging scenarios.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 33dce9e7558..c1a41cefc23 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -55,6 +55,10 @@ public TopicPartition partition() {
 return partition;
 }
 
+public String storeName() {
+return storeName;
+}
+
 long checkpoint() {
 return checkpoint == null ? NO_CHECKPOINT : checkpoint;
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 07af8019aef..1927b5a7af7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -71,7 +71,7 @@ public void register(final StateRestorer restorer) {
 
 public Collection restore(final RestoringTasks active) {
 if (!needsInitializing.isEmpty()) {
-initialize();
+initialize(active);
 }
 
 if (needsRestoring.isEmpty()) {
@@ -111,7 +111,7 @@ public void register(final StateRestorer restorer) {
 return completed();
 }
 
-private void initialize() {
+private void initialize(final RestoringTasks active) {
 if (!restoreConsumer.subscription().isEmpty()) {
 throw new StreamsException("Restore consumer should not be 
subscribed to any topics (" + restoreConsumer.subscription() + ")");
 }
@@ -165,11 +165,12 @@ private void initialize() {
 
 // set up restorer for those initializable
 if (!initializable.isEmpty()) {
-startRestoration(initializable);
+startRestoration(initializable, active);
 }
 }
 
-private void startRestoration(final Map 
initialized) {
+private void startRestoration(final Map 
initialized,
+  final RestoringTasks active) {
 log.debug("Start restoring state stores from changelog topics {}", 
initialized.keySet());
 
 final Set assignment = new 
HashSet<>(restoreConsumer.assignment());
@@ -186,6 +187,18 @@ private void startRestoration(final Map initializ
 
restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
 restorer.restoreStarted();
 } else {
+final StreamTask task = 
active.restoringTaskFor(restorer.partition());
+
+// If checkpoint does not exist it means the task was not 
shutdown gracefully before;
+// and in this case if EOS is turned on we should wipe out the 
state and re-initialize the task
+if (task.isEosEnabled()) {
+log.info("No checkpoint found for task {} state store {} 
changelog {} with EOS turned on. " +
+"Reinitializing the task and restore its state 
from the beginning.", task.id, restorer.storeName(), restorer.partition());
+
task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
+} else {
+log.info("Restoring task {}'s state store {} from 
beginning of the changelog {} ", task.id, 

[jira] [Commented] (KAFKA-3514) Stream timestamp computation needs some further thoughts

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559039#comment-16559039
 ] 

ASF GitHub Bot commented on KAFKA-3514:
---

guozhangwang opened a new pull request #5428: KAFKA-3514: Part III, Refactor 
StreamThread main loop
URL: https://github.com/apache/kafka/pull/5428
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Stream timestamp computation needs some further thoughts
> 
>
> Key: KAFKA-3514
> URL: https://issues.apache.org/jira/browse/KAFKA-3514
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: architecture
>
> Our current stream task's timestamp is used for punctuate function as well as 
> selecting which stream to process next (i.e. best effort stream 
> synchronization). And it is defined as the smallest timestamp over all 
> partitions in the task's partition group. This results in two unintuitive 
> corner cases:
> 1) observing a late arrived record would keep that stream's timestamp low for 
> a period of time, and hence keep being process until that late record. For 
> example take two partitions within the same task annotated by their 
> timestamps:
> {code}
> Stream A: 5, 6, 7, 8, 9, 1, 10
> {code}
> {code}
> Stream B: 2, 3, 4, 5
> {code}
> The late arrived record with timestamp "1" will cause stream A to be selected 
> continuously in the thread loop, i.e. messages with timestamp 5, 6, 7, 8, 9 
> until the record itself is dequeued and processed, then stream B will be 
> selected starting with timestamp 2.
> 2) an empty buffered partition will cause its timestamp to be not advanced, 
> and hence the task timestamp as well since it is the smallest among all 
> partitions. This may not be a severe problem compared with 1) above though.
> *Update*
> There is one more thing to consider (full discussion found here: 
> http://search-hadoop.com/m/Kafka/uyzND1iKZJN1yz0E5?subj=Order+of+punctuate+and+process+in+a+stream+processor)
> {quote}
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall 
> clock time
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
> I see that punctuate() is not called during the 3 seconds when I do not 
> send any messages. This is ok according to the documentation, because 
> there is not any new messages to trigger the punctuate() call. When the 
> first few messages arrives after a restart the sending (point 3. above) I 
> see the following sequence of method calls:
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
> What I would expect instead is that punctuate() is called first and then 
> process() is called on the messages, because the first message's timestamp 
> is already 3 seconds older then the last punctuate() was called, so the 
> first message belongs after the 3 punctuate() calls.
> {quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5875) Consumer group repeatedly fails to join, even across JVM restarts: BufferUnderFlowException reading the {{version}} field in the consumer protocol header

2018-07-26 Thread Ray Chiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ray Chiang updated KAFKA-5875:
--
Component/s: consumer

> Consumer group repeatedly fails to join, even across JVM restarts: 
> BufferUnderFlowException reading the {{version}} field in the consumer 
> protocol header
> -
>
> Key: KAFKA-5875
> URL: https://issues.apache.org/jira/browse/KAFKA-5875
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Evan Pollan
>Priority: Major
>
> I've seen this maybe once a month in our large cluster Kubernetes-based Kafka 
> consumers & producers.  Every once in a while a consumer in a Kubernetes 
> "pod" get this error trying to join a consumer group:
> {code}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka
>  version : 0.11.0.0","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.173+","logger":"org.apache.kafka.common.utils.AppInfoParser","message":"Kafka
>  commitId : cb8625948210849f","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+","logger":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","message":"Revoking
>  previously assigned partitions [] for group 
> conv-fetch-jobs-runner-for-internal","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:42.178+","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"(Re-)joining
>  group conv-fetch-jobs-runner-for-internal","exception":""}
> {"level":"INFO","@timestamp":"2017-09-12T13:45:43.588+","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"Successfully
>  joined group conv-fetch-jobs-runner-for-internal with generation 
> 17297","exception":""}
> {"errorType":"Error reading field 'version': 
> java.nio.BufferUnderflowException","level":"ERROR","message":"Died!","operation":"Died!","stacktrace":"org.apache.kafka.common.protocol.types.SchemaException:
>  Error reading field 'version': java.nio.BufferUnderflowException\n\tat 
> org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)\n\tat 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)\n\tat
>  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)\n\tat
>  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)\n\tat
>  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)\n\tat
>  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)\n\tat
>  
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)\n\tat
>  
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)\n\tat
>  
> com.spredfast.common.kafka.consumer.RunnableConsumer.pollOnce(RunnableConsumer.java:141)\n\tat
>  
> com.spredfast.common.kafka.consumer.RunnableConsumer.access$000(RunnableConsumer.java:28)\n\tat
>  
> com.spredfast.common.kafka.consumer.RunnableConsumer$Loop.iterate(RunnableConsumer.java:125)\n\tat
>  
> com.spredfast.common.kafka.consumer.RunnableConsumer.run(RunnableConsumer.java:78)\n\tat
>  
> java.lang.Thread.run(Thread.java:745)\n","trackingId":"dead-consumer","logger":"com.spredfast.common.kafka.consumer.RunnableConsumer","loggingVersion":"UNKNOWN","component":"MetricsFetch","pid":25,"host":"fetch-2420457278-sh4f5","@timestamp":"2017-09-12T13:45:43.613Z"}
> {code}
> Pardon the log format -- these get sucked into logstash, thus the JSON.
> Here's the raw stacktrace: 
> {code}
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException
>   at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:220)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>   at 
> 

[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558874#comment-16558874
 ] 

Yogesh BG commented on KAFKA-7209:
--

Here are observation

3 broker and 3 stream app - initially working fine

kill one app, then gets rebalanced and start streaming without loss in data

i could see below logs

{{20:15:26.627 [ks_0_inst_CSV_LOG-StreamThread-22] INFO  
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [PR-35, 
PR_vThunder-27, PR-27, PR_vThunder-35] for group 
aggregation-framework03_CSV_LOG}}{{20:15:26.627 
[ks_0_inst_CSV_LOG-StreamThread-20] INFO  o.a.k.s.p.internals.StreamThread - 
stream-thread [ks_0_inst_CSV_LOG-StreamThread-20] State transition from 
PARTITIONS_REVOKED to PARTITIONS_ASSIGNED.}}{{20:16:32.174 
[ks_0_inst_THUNDER_LOG_L7-StreamThread-90] INFO  
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions 
[THUNDER_LOG_L7_PR-15, THUNDER_LOG_L7_PE-15] for group 
aggregation-framework03_THUNDER_LOG_L7}}{{20:16:32.175 
[ks_0_inst_THUNDER_LOG_L7-StreamThread-86] INFO  
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions 
[THUNDER_LOG_L7_PR-9, THUNDER_LOG_L7_PE-9] for group 
aggregation-framework03_THUNDER_LOG_L7}}{{20:16:32.175 
[ks_0_inst_THUNDER_LOG_L7-StreamThread-85] INFO  
o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions 
[THUNDER_LOG_L7_PE-35, THUNDER_LOG_L7_PR-27, THUNDER_LOG_L7_PE-27, 
THUNDER_LOG_L7_PR-35] for group aggregation-framework03_THUNDER_LOG_L7}}

 

But the thing i dont get is when i look into sate dir i dont see the partition 
folders get created for newly assigned partitions

below is the initial state before i kill first one[rtp-worker-2] and for other 
two it remains same and does not changes at all

 

{{[root@rtp-worker-2 /]# ls 
/tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_0*  *0_10*  
*0_12*  *0_14*  *0_16*  *0_18*  *0_2*   *0_21*  *0_23*  *0_25*  *0_27*  *0_29*  
*0_30*  *0_32*  *0_34*  *0_4*  *0_6*  *0_8*}}{{*0_1*  *0_11*  *0_13*  *0_15*  
*0_17*  *0_19*  *0_20*  *0_22*  *0_24*  *0_26*  *0_28*  *0_3*   *0_31*  *0_33*  
*0_35*  *0_5*  *0_7*  *0_9*}}{{[root@rtp-worker-0 /]# ls 
/tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_0*  *0_1*  
*0_10*  *0_11*  *0_2*  *0_3*  *0_4*  *0_5*  *0_6*  *0_7*  *0_8*  
*0_9*}}{{[root@rtp-worker-1 /]# ls 
/tmp/data/kstreams/aggregation-framework_THUNDER_LOG_L7/}}{{*0_11*  *0_12*  
*0_13*  *0_14*  *0_15*  *0_16*  *0_17*  *0_18*  *0_19*  *0_20*  *0_21*  *0_22*  
*0_23*}}

 

Another case is that all 3 apps running successfully, i bring down one broker 
then broker gets rebalanced itself. Apps also gets rebalanced with broker and 
start streaming data, *but there is a data loss observed, when rebalancing in 
broker is happening. Is there a way to avoid this? does other two broker become 
non responsive when cluster is rebalancing???*

 

{color:#FF}*Next is when broker and stream goes down at the same time, then 
i could see broker gets rebalanced and i see some communication messages being 
received by apps but they never gets back to streaming, esp when multiple 
partitions are there, those topics which has one partitions gets to streaming 
in sometime.*{color}

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5662) We should be able to specify min.insync.replicas for the __consumer_offsets topic

2018-07-26 Thread Stanislav Kozlovski (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski reassigned KAFKA-5662:
--

Assignee: Stanislav Kozlovski

> We should be able to specify min.insync.replicas for the __consumer_offsets 
> topic
> -
>
> Key: KAFKA-5662
> URL: https://issues.apache.org/jira/browse/KAFKA-5662
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Stanislav Kozlovski
>Priority: Major
>  Labels: needs-kip
>
> The transaction log has a {{transaction.state.log.min.isr}} setting to 
> control the min.isr for the transaction log (by default the min.isr is 2 and 
> replication.factor is 3).
> Unfortunately, we don't have a similar setting for the offsets topic. We 
> should add the following {{offsets.topic.min.isr}} setting and default that 
> to 2 so that we have durability on the offsets topic. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-3841) MirrorMaker topic renaming

2018-07-26 Thread David (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558795#comment-16558795
 ] 

David edited comment on KAFKA-3841 at 7/26/18 7:29 PM:
---

Just wanted to mention related info about the message handler option: 
[https://www.opencore.com/blog/2017/1/170131-mirrormaker-change-topic/]


was (Author: daluu):
Just wanted to mention related info about the MessageHandler option: 
https://www.opencore.com/blog/2017/1/170131-mirrormaker-change-topic/

> MirrorMaker topic renaming
> --
>
> Key: KAFKA-3841
> URL: https://issues.apache.org/jira/browse/KAFKA-3841
> Project: Kafka
>  Issue Type: New Feature
>  Components: tools
>Affects Versions: 0.10.0.0
>Reporter: Ning Zhang
>Priority: Major
>
> Our organization (walmart.com) has been a Kafka user since some years back 
> and MirrorMaker has been a convenient tool to bring our Kafka data from one 
> Kafka cluster to another cluster.
> In many our use cases, the mirrored topic from the source cluster may not 
> want to have the same name in the target cluster. This could be a valid 
> scenario when the same topic name already exists on the target cluster, or we 
> want to append the name of the data center to the topic name in the target 
> cluster, such as "grocery_items_mirror_sunnyvale", to explicitly identify the 
> source (e.g. sunnyvale) and nature (e.g. mirroring) of the topic.
> We have implemented the MirrorMaker topic renaming feature internally which 
> has been used for production over a couple of years. While keeping our 
> internal Kafka fork with the above "renaming" branch across version upgrade 
> does not cost us too much labor, we think it may be meaningful to contribute 
> back to the community so that potentially many people may have the similar 
> expectation and could benefit from this feature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3841) MirrorMaker topic renaming

2018-07-26 Thread David (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558795#comment-16558795
 ] 

David commented on KAFKA-3841:
--

Just wanted to mention related info about the MessageHandler option: 
https://www.opencore.com/blog/2017/1/170131-mirrormaker-change-topic/

> MirrorMaker topic renaming
> --
>
> Key: KAFKA-3841
> URL: https://issues.apache.org/jira/browse/KAFKA-3841
> Project: Kafka
>  Issue Type: New Feature
>  Components: tools
>Affects Versions: 0.10.0.0
>Reporter: Ning Zhang
>Priority: Major
>
> Our organization (walmart.com) has been a Kafka user since some years back 
> and MirrorMaker has been a convenient tool to bring our Kafka data from one 
> Kafka cluster to another cluster.
> In many our use cases, the mirrored topic from the source cluster may not 
> want to have the same name in the target cluster. This could be a valid 
> scenario when the same topic name already exists on the target cluster, or we 
> want to append the name of the data center to the topic name in the target 
> cluster, such as "grocery_items_mirror_sunnyvale", to explicitly identify the 
> source (e.g. sunnyvale) and nature (e.g. mirroring) of the topic.
> We have implemented the MirrorMaker topic renaming feature internally which 
> has been used for production over a couple of years. While keeping our 
> internal Kafka fork with the above "renaming" branch across version upgrade 
> does not cost us too much labor, we think it may be meaningful to contribute 
> back to the community so that potentially many people may have the similar 
> expectation and could benefit from this feature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558772#comment-16558772
 ] 

Yogesh BG commented on KAFKA-7209:
--

in one of the forum i see below statement, does this impact anything? we keep 
default value being 3 for these configurations

 

what is your offsets.topic.replication.factor and 
transaction.state.log.replication.factor? If those are set to 3 and you have 
less than 3 brokers I don't imagine things will go well

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558752#comment-16558752
 ] 

Yogesh BG edited comment on KAFKA-7209 at 7/26/18 6:51 PM:
---

when i kill the one of the broker it get rebalances its partitions among the 
other two brokers

and i launch a new app with different name than previous one then its 
successfully streams

i am bit confused abt existing streaming app whether they get rebalnced topic 
info and work accordingly, i am doing the test will update u soon


was (Author: yogeshbelur):
when i kill the one of the broker it get rebalances its partitions among the 
other two ingestors

and i launch a new app with different name than previous one then its 
successfully streams

i am bit confused abt existing streaming app whether they get rebalnced topic 
info and work accordingly, i am doing the test will update u soon

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558748#comment-16558748
 ] 

Matthias J. Sax commented on KAFKA-7209:


Would be good to verify. If broker-only and streams-only fail-over works, it 
seems to be a bug if "double fail over" does not work. It's unclear though, if 
it's a stream, consumer, or broker bug. Do the broker fail over correctly if 
you kill the machine (ie, broker and streams at once)?

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558731#comment-16558731
 ] 

Yogesh BG commented on KAFKA-7209:
--

is there any manual work around for this to resolve? i think killing only 
broker or killing only application works

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6014) new consumer mirror maker halts after committing offsets to a deleted topic

2018-07-26 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558706#comment-16558706
 ] 

Jason Gustafson commented on KAFKA-6014:


[~felixgv]  It's a good question. In the worst case, it would probably hang 
because MM uses `commitSync` internally without a timeout. However, now that we 
have KIP-266, this timeout can be overridden using the consumer's 
`default.api.timeout.ms` configuration. MM probably needs to be updated to 
handle TimeoutException in this case though. I went ahead and opened KAFKA-7211 
if you are interested.

> new consumer mirror maker halts after committing offsets to a deleted topic
> ---
>
> Key: KAFKA-6014
> URL: https://issues.apache.org/jira/browse/KAFKA-6014
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Jason Gustafson
>Priority: Major
>
> New consumer throws an unexpected KafkaException when trying to commit to a 
> topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to 
> catch the KafkaException and just kills the process. We didn't see this in 
> the old consumer because old consumer just silently drops failed offset 
> commits.
> I ran a quick experiment locally to prove the behavior. The experiment:
> 1. start up a single broker
> 2. create a single-partition topic t
> 3. create a new consumer that consumes topic t
> 4. make the consumer commit every few seconds
> 5. delete topic t
> 6. expect: KafkaException that kills the process.
> Here's my script:
> {code}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Collections;
> import java.util.List;
> import java.util.Properties;
> public class OffsetCommitTopicDeletionTest {
> public static void main(String[] args) throws InterruptedException {
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> "localhost:9090");
> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> KafkaConsumer kafkaConsumer = new 
> KafkaConsumer<>(props);
> TopicPartition partition = new TopicPartition("t", 0);
> List partitions = 
> Collections.singletonList(partition);
> kafkaConsumer.assign(partitions);
> while (true) {
> kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
> OffsetAndMetadata(0, "")));
> Thread.sleep(1000);
> }
> }
> }
> {code}
> Here are the other commands:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh 
> > config/server0.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> > --partitions 1 --replication-factor 1
> > ./bin/kafka-run-class.sh 
> > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
> {code}
> Here is the output:
> {code}
> [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
> Offset commit failed on partition t-0 at offset 0: This server does not host 
> this topic-partition. 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
> t-0 may not exist or user may not have Describe access to topic
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>   at 
> 

[jira] [Created] (KAFKA-7211) MM should handle timeouts in commitSync

2018-07-26 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-7211:
--

 Summary: MM should handle timeouts in commitSync
 Key: KAFKA-7211
 URL: https://issues.apache.org/jira/browse/KAFKA-7211
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Now that we have KIP-266, the user can override `default.api.timeout.ms` for 
the consumer so that commitSync does not block indefinitely. MM needs to be 
updated to handle TimeoutException. We may also need some logic to handle 
deleted topics. If MM attempts to commit an offset for a deleted topic, the 
call will timeout and we should probably check if the topic exists and remove 
the offset if it doesn't.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558702#comment-16558702
 ] 

Matthias J. Sax commented on KAFKA-7209:


Not sure atm. Note, that it is not recommended to run brokers and streams 
application on the same machine. Does it work if you only kill the broker or if 
you only kill the streams application? Also, can you try out 0.11.0.3?

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7210) Add system test for log compaction

2018-07-26 Thread Manikumar (JIRA)
Manikumar created KAFKA-7210:


 Summary: Add system test for log compaction
 Key: KAFKA-7210
 URL: https://issues.apache.org/jira/browse/KAFKA-7210
 Project: Kafka
  Issue Type: Improvement
Reporter: Manikumar
Assignee: Manikumar
 Fix For: 2.1.0


Currently we have TestLogCleaning tool for stress test log compaction. This 
JIRA is to integrate the tool to system test.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7126) Reduce number of rebalance for large consumer groups after a topic is created

2018-07-26 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7126.
-
Resolution: Fixed

> Reduce number of rebalance for large consumer groups after a topic is created
> -
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.0.0, 2.1.0
>
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7126) Reduce number of rebalance for large consumer groups after a topic is created

2018-07-26 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7126:

Summary: Reduce number of rebalance for large consumer groups after a topic 
is created  (was: Reduce number of rebalance period for large consumer groups 
after a topic is created)

> Reduce number of rebalance for large consumer groups after a topic is created
> -
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.0.0, 2.1.0
>
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7126) Reduce number of rebalance period for large consumer groups after a topic is created

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558652#comment-16558652
 ] 

ASF GitHub Bot commented on KAFKA-7126:
---

lindong28 closed pull request #5408: KAFKA-7126: Reduce number of rebalance for 
large consumer group after a topic is created
URL: https://github.com/apache/kafka/pull/5408
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 6c663cfac93..c8fa66b7e3b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -124,15 +124,27 @@ public synchronized void add(String topic) {
 }
 }
 
+/**
+ * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+ *
+ * @param nowMs current time in ms
+ * @return remaining time in ms till the cluster info can be updated again
+ */
+public synchronized long timeToAllowUpdate(long nowMs) {
+return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+}
+
 /**
  * The next time to update the cluster info is the maximum of the time the 
current info will expire and the time the
  * current info can be updated (i.e. backoff time has elapsed); If an 
update has been request then the expiry time
  * is now
+ *
+ * @param nowMs current time in ms
+ * @return remaining time in ms till updating the cluster info
  */
-public synchronized long timeToNextUpdate(long nowMs) {
+public synchronized long  timeToNextUpdate(long nowMs) {
 long timeToExpire = needUpdate ? 0 : 
Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - 
nowMs;
-return Math.max(timeToExpire, timeToAllowUpdate);
+return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
 }
 
 /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index f9b77e921cd..8f25d6e8eee 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -328,6 +328,16 @@ public boolean poll(final long timeoutMs) {
 // we need to ensure that the metadata is fresh before joining 
initially. This ensures
 // that we have matched the pattern against the cluster's 
topics at least once before joining.
 if (subscriptions.hasPatternSubscription()) {
+// For consumer group that uses pattern-based 
subscription, after a topic is created,
+// any consumer that discovers the topic after metadata 
refresh can trigger rebalance
+// across the entire consumer group. Multiple rebalances 
can be triggered after one topic
+// creation if consumers refresh metadata at vastly 
different times. We can significantly
+// reduce the number of rebalances caused by single topic 
creation by asking consumer to
+// refresh metadata before re-joining the group as long as 
the refresh backoff time has
+// passed.
+if (this.metadata.timeToAllowUpdate(currentTime) == 0) {
+this.metadata.requestUpdate();
+}
 if 
(!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
 return false;
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ba392c6f4cb..cec56b07e00 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -513,6 +513,50 @@ public boolean matches(AbstractRequest body) {
 assertEquals(newAssignmentSet, rebalanceListener.assigned);
 }
 
+@Test
+public void 
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
+// Set up a non-leader consumer with pattern subscription and a 
cluster containing one topic matching the
+// pattern.
+

[jira] [Created] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-26 Thread Yogesh BG (JIRA)
Yogesh BG created KAFKA-7209:


 Summary: Kafka stream does not rebalance when one node gets down
 Key: KAFKA-7209
 URL: https://issues.apache.org/jira/browse/KAFKA-7209
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.11.0.1
Reporter: Yogesh BG


I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
backoff time default
 
I have 3 nodes running kafka cluster of 3 broker

and i am running the 3 kafka stream with same 
[application.id|http://application.id/]
each node has one broker one kafka stream application
everything works fine during setup
i bringdown one node, so one kafka broker and one streaming app is down
now i see exceptions in other two streaming apps and it never gets re balanced 
waited for hours and never comes back to norma
is there anything am missing?
i also tried looking into when one broker is down call stream.close, cleanup 
and restart this also doesn't help
can anyone help me?
 
 
 
 One thing i observed lately is that kafka topics with partitions one gets 
reassigned but i have topics of 16 partitions and replication factor 3. It 
never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7126) Reduce number of rebalance period for large consumer groups after a topic is created

2018-07-26 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin updated KAFKA-7126:

Fix Version/s: 2.1.0
   2.0.0

> Reduce number of rebalance period for large consumer groups after a topic is 
> created
> 
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.0.0, 2.1.0
>
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4690) IllegalStateException using DeleteTopicsRequest when delete.topic.enable=false

2018-07-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558608#comment-16558608
 ] 

ASF GitHub Bot commented on KAFKA-4690:
---

omkreddy closed pull request #5130: KAFKA-4690: Return new error code for 
DeleteTopicRequest when topic deletion disabled.
URL: https://github.com/apache/kafka/pull/5130
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
new file mode 100644
index 000..41577d2a288
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/TopicDeletionDisabledException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class TopicDeletionDisabledException extends  ApiException {
+private static final long serialVersionUID = 1L;
+
+public TopicDeletionDisabledException() {
+}
+
+public TopicDeletionDisabledException(String message) {
+super(message);
+}
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 5db1d314be3..e0542d85ec2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -79,6 +79,7 @@
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
 import org.apache.kafka.common.errors.TransactionCoordinatorFencedException;
@@ -624,6 +625,13 @@ public ApiException build(String message) {
 public ApiException build(String message) {
 return new InvalidFetchSessionEpochException(message);
 }
+}),
+TOPIC_DELETION_DISABLED(72, "Topic deletion is disabled.",
+new ApiExceptionBuilder() {
+@Override
+public ApiException build(String message) {
+return new TopicDeletionDisabledException(message);
+}
 });
 
 private interface ApiExceptionBuilder {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9f1ab62f03d..72238e1965a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1530,6 +1530,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 (topic, Errors.NOT_CONTROLLER)
   }.toMap
   sendResponseCallback(results)
+} else if (!config.deleteTopicEnable) {
+  val results = deleteTopicRequest.topics.asScala.map { topic =>
+(topic, Errors.TOPIC_DELETION_DISABLED)
+  }.toMap
+  sendResponseCallback(results)
 } else {
   // If no authorized topics return immediately
   if (authorizedForDeleteTopics.isEmpty)
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
new file mode 100644
index 000..37914ac844f
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0

[jira] [Resolved] (KAFKA-4690) IllegalStateException using DeleteTopicsRequest when delete.topic.enable=false

2018-07-26 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-4690.
--
Resolution: Duplicate

Resolving as duplicate of KAFKA-5975.

> IllegalStateException using DeleteTopicsRequest when delete.topic.enable=false
> --
>
> Key: KAFKA-4690
> URL: https://issues.apache.org/jira/browse/KAFKA-4690
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0
> Environment: OS X
>Reporter: Jon Chiu
>Assignee: Manikumar
>Priority: Major
> Attachments: delete-topics-request.java
>
>
> There is no indication as to why the delete request fails. Perhaps an error 
> code?
> This can be reproduced with the following steps:
> 1. Start ZK and 1 broker (with default {{delete.topic.enable=false}})
> 2. Create a topic test
> {noformat}
> bin/kafka-topics.sh --zookeeper localhost:2181 \
>   --create --topic test --partition 1 --replication-factor 1
> {noformat}
> 3. Delete topic by sending a DeleteTopicsRequest
> 4. An error is returned
> {noformat}
> org.apache.kafka.common.errors.DisconnectException
> {noformat}
> or 
> {noformat}
> java.lang.IllegalStateException: Attempt to retrieve exception from future 
> which hasn't failed
>   at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99)
>   at 
> io.confluent.adminclient.KafkaAdminClient.send(KafkaAdminClient.java:195)
>   at 
> io.confluent.adminclient.KafkaAdminClient.deleteTopic(KafkaAdminClient.java:152)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5621) The producer should retry expired batches when retries are enabled

2018-07-26 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558593#comment-16558593
 ] 

Ismael Juma commented on KAFKA-5621:


KAFKA-5886, which is another way of fixing the underlying problem, has been 
merged.

> The producer should retry expired batches when retries are enabled
> --
>
> Key: KAFKA-5621
> URL: https://issues.apache.org/jira/browse/KAFKA-5621
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Sumant Tambe
>Priority: Major
>
> Today, when a batch is expired in the accumulator, a {{TimeoutException}} is 
> raised to the user.
> It might be better the producer to retry the expired batch rather up to the 
> configured number of retries. This is more intuitive from the user's point of 
> view. 
> Further the proposed behavior makes it easier for applications like mirror 
> maker to provide ordering guarantees even when batches expire. Today, they 
> would resend the expired batch and it would get added to the back of the 
> queue, causing the output ordering to be different from the input ordering.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7134) KafkaLog4jAppender - Appender exceptions are propagated to caller

2018-07-26 Thread Attila Sasvari (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558325#comment-16558325
 ] 

Attila Sasvari commented on KAFKA-7134:
---

[~venkatpotru] please note if the underlying producer cannot connect to a Kafka 
broker (because the broker is not running), {{send()}} will fail and throw a 
{{TimeoutException}}. The producer will notice it and tries to log it:
https://github.com/apache/kafka/blob/1d9a427225c64e7629a4eb2e2d129d5551185049/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L876

However, if the {{rootLogger}} in {{log4j.properties}} is set to the 
{{KafkaLog4jAppender}}, it will try to log this message and send it to Kafka, 
and it creates an infinite loop. 

> KafkaLog4jAppender - Appender exceptions are propagated to caller
> -
>
> Key: KAFKA-7134
> URL: https://issues.apache.org/jira/browse/KAFKA-7134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: venkata praveen
>Assignee: Andras Katona
>Priority: Major
>
> KafkaLog4jAppender exceptions are propagated to caller when Kafka is 
> down/slow/other, it may cause the application crash. Ideally appender should 
> print and ignore the exception
>  or should provide option to ignore/throw the exceptions like 
> 'ignoreExceptions' property of 
> https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-07-26 Thread lambdaliu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558265#comment-16558265
 ] 

lambdaliu commented on KAFKA-7190:
--

Hello [~guozhang]. My team developed a cloud version Kafka and I am familiar 
with the broker. So I think probably I can solve this issue.

When we remove the head of the log, we take the bellowing steps in 
ProducerStateManager.truncateHead :

1. clean producerId whose last offset smaller than log start offset
2. remove procducerId's BatchMetadata which have a last offset smaller than log 
start offset
3. remove ongoing transaction whose producerId remove in step 1.
4. remove unreplicated transaction whose last offset smaller than log start 
offset
5. update lastMapOffset to log start offset if lasterMapoffset is smaller than 
log start offset
6. delete snapshot file older than the new log start offset

As you suggested, we can delay the deletion of producer ID until it expired. We 
can also delay the step 2 and step 3 to that time.

For the old snapshot file in step 6, we can rely on the period called function 
deleteSnapshotsAfterRecoveryPointCheckpoint to delete it. And when loading 
producer state from snapshot file we may not drop the producerId whose last 
offset smaller than log start offset. 

So we just need do step 4 and step 5 when remove the head of the log.

For the additional PID expiration config, is there any reason to add it? if it 
is reasonable, I will add it.

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7208) AlterConfigsRequest with null config value throws NullPointerException

2018-07-26 Thread Sean Policarpio (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Policarpio resolved KAFKA-7208.

Resolution: Not A Problem

Oops, realised I misunderstood how this API request works; ConfigEntries 
represents the entire map of overrides you want to apply for the resource 
(topic in this case). So to delete an override, it just needs to not be 
specified in the map submitted by the API, in which case the default value will 
take over.

The only issue here is that the documentation is still wrong, in that it says 
the config value is a nullable String.

> AlterConfigsRequest with null config value throws NullPointerException
> --
>
> Key: KAFKA-7208
> URL: https://issues.apache.org/jira/browse/KAFKA-7208
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, config, core
>Affects Versions: 1.1.1
>Reporter: Sean Policarpio
>Priority: Major
>
> The following exception is thrown from the Kafka server when an 
> AlterConfigsRequest API request is made via the binary protocol where the 
> CONFIG_ENTRY's CONFIG_VALUE is set to null:
> {noformat}
> [2018-07-26 15:53:01,487] ERROR [Admin Manager on Broker 1000]: Error 
> processing alter configs request for resource Resource(type=TOPIC, 
> name='foo'}, config 
> org.apache.kafka.common.requests.AlterConfigsRequest$Config@692d8300 
> (kafka.server.AdminManager)
> java.lang.NullPointerException
>   at java.util.Hashtable.put(Hashtable.java:459)
>   at java.util.Properties.setProperty(Properties.java:166)
>   at 
> kafka.server.AdminManager$$anonfun$alterConfigs$1$$anonfun$apply$18.apply(AdminManager.scala:357)
>   at 
> kafka.server.AdminManager$$anonfun$alterConfigs$1$$anonfun$apply$18.apply(AdminManager.scala:356)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at 
> kafka.server.AdminManager$$anonfun$alterConfigs$1.apply(AdminManager.scala:356)
>   at 
> kafka.server.AdminManager$$anonfun$alterConfigs$1.apply(AdminManager.scala:339)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at kafka.server.AdminManager.alterConfigs(AdminManager.scala:339)
>   at 
> kafka.server.KafkaApis.handleAlterConfigsRequest(KafkaApis.scala:1994)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:143)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> As a first guess, I'd say the issue is happening 
> [here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/core/src/main/scala/kafka/server/AdminManager.scala#L361],
>  since HashTable/Property can't take a null value.
> The reason I'm sending a null for the configuration value is I assumed that 
> since the API documentation says that the value is nullable (see 
> [here|http://kafka.apache.org/protocol.html#The_Messages_AlterConfigs] and 
> [here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java#L53]),
>  this meant the (override) configuration itself would be deleted when a null 
> value was received.
> Contradictory to that, I did notice null checks throughout the code, like 
> [here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java#L92].
> If null is in fact not meant to be accepted by the binary API, this probably 
> needs to be addressed at the protocol level. Further to that, can someone 
> show me how to then remove a configuration override via the binary API?
> For clarity sake, here's what my request looked like (pseudo/Rust):
> {noformat}
> AlterConfigsRequest {
>   resources: [
> Resource
> {
>   resource_type: 2,
>   resource_name: "foo",
>   config_entries: [
> ConfigEntry
> {
>   config_name: "file.delete.delay.ms",
>   

[jira] [Commented] (KAFKA-7134) KafkaLog4jAppender - Appender exceptions are propagated to caller

2018-07-26 Thread venkata praveen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558077#comment-16558077
 ] 

venkata praveen commented on KAFKA-7134:


This issue is about 1st one i.e : 
org.apache.kafka.log4jappender.KafkaLog4jAppender.

 

> KafkaLog4jAppender - Appender exceptions are propagated to caller
> -
>
> Key: KAFKA-7134
> URL: https://issues.apache.org/jira/browse/KAFKA-7134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: venkata praveen
>Assignee: Andras Katona
>Priority: Major
>
> KafkaLog4jAppender exceptions are propagated to caller when Kafka is 
> down/slow/other, it may cause the application crash. Ideally appender should 
> print and ignore the exception
>  or should provide option to ignore/throw the exceptions like 
> 'ignoreExceptions' property of 
> https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7134) KafkaLog4jAppender - Appender exceptions are propagated to caller

2018-07-26 Thread Andras Katona (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558028#comment-16558028
 ] 

Andras Katona commented on KAFKA-7134:
--

I just realized, there are two KafkaLog4jAppender classes:
 # org.apache.kafka.log4jappender.KafkaLog4jAppender - this is in kafka 
repository
 # org.apache.logging.log4j.core.appender.mom.kafka.KafkaAppender - in log4j 
repository

The documentation is about #2. And in my opinion that one should be used 
instead of #1. The appender in log4j repository is far more sophisticated.

[~venkatpotru], which appender is this issue about?

> KafkaLog4jAppender - Appender exceptions are propagated to caller
> -
>
> Key: KAFKA-7134
> URL: https://issues.apache.org/jira/browse/KAFKA-7134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: venkata praveen
>Assignee: Andras Katona
>Priority: Major
>
> KafkaLog4jAppender exceptions are propagated to caller when Kafka is 
> down/slow/other, it may cause the application crash. Ideally appender should 
> print and ignore the exception
>  or should provide option to ignore/throw the exceptions like 
> 'ignoreExceptions' property of 
> https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-26 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556958#comment-16556958
 ] 

Matthias J. Sax commented on KAFKA-6520:


No worries. All good :)

A KIP is a "Kafka Improvement Proposal": 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]

It's only required for major features and public API changes. This ticket does 
not need a KIP. Thus, you should not start a DISCUSS thread and voting is not 
required either. It might just be helpful to do a similar writeup (could be a 
google doc) for the design – might help to get "your feet wet" as you phrased 
it. If you don't want to do it, it's fine, too. I recommend it, as it seems to 
be more efficient if you do a full design instead and ask all open question at 
once if you have any instead of individual questions going forth and back in 
the comment section on the ticket. Let us know if you need any help.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7208) AlterConfigsRequest with null config value throws NullPointerException

2018-07-26 Thread Sean Policarpio (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Policarpio updated KAFKA-7208:
---
Description: 
The following exception is thrown from the Kafka server when an 
AlterConfigsRequest API request is made via the binary protocol where the 
CONFIG_ENTRY's CONFIG_VALUE is set to null:
{noformat}
[2018-07-26 15:53:01,487] ERROR [Admin Manager on Broker 1000]: Error 
processing alter configs request for resource Resource(type=TOPIC, name='foo'}, 
config org.apache.kafka.common.requests.AlterConfigsRequest$Config@692d8300 
(kafka.server.AdminManager)
java.lang.NullPointerException
at java.util.Hashtable.put(Hashtable.java:459)
at java.util.Properties.setProperty(Properties.java:166)
at 
kafka.server.AdminManager$$anonfun$alterConfigs$1$$anonfun$apply$18.apply(AdminManager.scala:357)
at 
kafka.server.AdminManager$$anonfun$alterConfigs$1$$anonfun$apply$18.apply(AdminManager.scala:356)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at 
kafka.server.AdminManager$$anonfun$alterConfigs$1.apply(AdminManager.scala:356)
at 
kafka.server.AdminManager$$anonfun$alterConfigs$1.apply(AdminManager.scala:339)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.server.AdminManager.alterConfigs(AdminManager.scala:339)
at 
kafka.server.KafkaApis.handleAlterConfigsRequest(KafkaApis.scala:1994)
at kafka.server.KafkaApis.handle(KafkaApis.scala:143)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:745)
{noformat}
As a first guess, I'd say the issue is happening 
[here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/core/src/main/scala/kafka/server/AdminManager.scala#L361],
 since HashTable/Property can't take a null value.

The reason I'm sending a null for the configuration value is I assumed that 
since the API documentation says that the value is nullable (see 
[here|http://kafka.apache.org/protocol.html#The_Messages_AlterConfigs] and 
[here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java#L53]),
 this meant the (override) configuration itself would be deleted when a null 
value was received.

Contradictory to that, I did notice null checks throughout the code, like 
[here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java#L92].

If null is in fact not meant to be accepted by the binary API, this probably 
needs to be addressed at the protocol level. Further to that, can someone show 
me how to then remove a configuration override via the binary API?

For clarity sake, here's what my request looked like (pseudo/Rust):
{noformat}
AlterConfigsRequest { 
  resources: [ 
Resource { 
  resource_type: 2, 
  resource_name: "foo", 
  config_entries: [ 
ConfigEntry { 
  config_name: "file.delete.delay.ms", 
  config_value: None // serialized as a 16-bit '-1' 
} 
   ] 
  } 
 ], 
 validate_only: false 
}
{noformat}
 

  was:
The following exception is thrown from the Kafka server when an 
AlterConfigsRequest API request is made via the binary protocol where the 
CONFIG_ENTRY's CONFIG_VALUE is set to null:
{noformat}
[2018-07-26 15:53:01,487] ERROR [Admin Manager on Broker 1000]: Error 
processing alter configs request for resource Resource(type=TOPIC, name='foo'}, 
config org.apache.kafka.common.requests.AlterConfigsRequest$Config@692d8300 
(kafka.server.AdminManager) java.lang.NullPointerException at 
java.util.Hashtable.put(Hashtable.java:459) at 
java.util.Properties.setProperty(Properties.java:166) at 
kafka.server.AdminManager$$anonfun$alterConfigs$1$$anonfun$apply$18.apply(AdminManager.scala:357)
 at 
kafka.server.AdminManager$$anonfun$alterConfigs$1$$anonfun$apply$18.apply(AdminManager.scala:356)
 at 

[jira] [Created] (KAFKA-7208) AlterConfigsRequest with null config value throws NullPointerException

2018-07-26 Thread Sean Policarpio (JIRA)
Sean Policarpio created KAFKA-7208:
--

 Summary: AlterConfigsRequest with null config value throws 
NullPointerException
 Key: KAFKA-7208
 URL: https://issues.apache.org/jira/browse/KAFKA-7208
 Project: Kafka
  Issue Type: Bug
  Components: admin, config, core
Affects Versions: 1.1.1
Reporter: Sean Policarpio


The following exception is thrown from the Kafka server when an 
AlterConfigsRequest API request is made via the binary protocol where the 
CONFIG_ENTRY's CONFIG_VALUE is set to null:
{noformat}
[2018-07-26 15:53:01,487] ERROR [Admin Manager on Broker 1000]: Error 
processing alter configs request for resource Resource(type=TOPIC, name='foo'}, 
config org.apache.kafka.common.requests.AlterConfigsRequest$Config@692d8300 
(kafka.server.AdminManager) java.lang.NullPointerException at 
java.util.Hashtable.put(Hashtable.java:459) at 
java.util.Properties.setProperty(Properties.java:166) at 
kafka.server.AdminManager$$anonfun$alterConfigs$1$$anonfun$apply$18.apply(AdminManager.scala:357)
 at 
kafka.server.AdminManager$$anonfun$alterConfigs$1$$anonfun$apply$18.apply(AdminManager.scala:356)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
kafka.server.AdminManager$$anonfun$alterConfigs$1.apply(AdminManager.scala:356) 
at 
kafka.server.AdminManager$$anonfun$alterConfigs$1.apply(AdminManager.scala:339) 
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
kafka.server.AdminManager.alterConfigs(AdminManager.scala:339) at 
kafka.server.KafkaApis.handleAlterConfigsRequest(KafkaApis.scala:1994) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:143) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:745)
{noformat}
As a first guess, I'd say the issue is happening 
[here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/core/src/main/scala/kafka/server/AdminManager.scala#L361],
 since HashTable/Property can't take a null value.

The reason I'm sending a null for the configuration value is I assumed that 
since the API documentation says that the value is nullable (see 
[here|http://kafka.apache.org/protocol.html#The_Messages_AlterConfigs] and 
[here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java#L53]),
 this meant the (override) configuration itself would be deleted when a null 
value was received.

Contradictory to that, I did notice null checks throughout the code, like 
[here|https://github.com/apache/kafka/blob/49db5a63c043b50c10c2dfd0648f8d74ee917b6a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java#L92].

If null is in fact not meant to be accepted by the binary API, this probably 
needs to be addressed at the protocol level. Further to that, can someone show 
me how to then remove a configuration override via the binary API?

For clarity sake, here's what my request looked like (pseudo/Rust):
{noformat}
AlterConfigsRequest { 
  resources: [ 
Resource { 
  resource_type: 2, 
  resource_name: "foo", 
  config_entries: [ 
ConfigEntry { 
  config_name: "file.delete.delay.ms", 
  config_value: None // serialized as a 16-bit '-1' 
} 
   ] 
  } 
 ], 
 validate_only: false 
}
{noformat}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)