[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560622#comment-16560622
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang closed pull request #5430: KAFKA-7192 Follow-up: update checkpoint 
to the reset beginning offset
URL: https://github.com/apache/kafka/pull/5430
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index c1a41cefc23..3bbf42ead27 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -26,13 +26,13 @@
 
 static final int NO_CHECKPOINT = -1;
 
-private final Long checkpoint;
 private final long offsetLimit;
 private final boolean persistent;
 private final String storeName;
 private final TopicPartition partition;
 private final CompositeRestoreListener compositeRestoreListener;
 
+private long checkpointOffset;
 private long restoredOffset;
 private long startingOffset;
 private long endingOffset;
@@ -45,7 +45,7 @@
   final String storeName) {
 this.partition = partition;
 this.compositeRestoreListener = compositeRestoreListener;
-this.checkpoint = checkpoint;
+this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : 
checkpoint;
 this.offsetLimit = offsetLimit;
 this.persistent = persistent;
 this.storeName = storeName;
@@ -60,7 +60,11 @@ public String storeName() {
 }
 
 long checkpoint() {
-return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+return checkpointOffset;
+}
+
+void setCheckpointOffset(final long checkpointOffset) {
+this.checkpointOffset = checkpointOffset;
 }
 
 void restoreStarted() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 1927b5a7af7..9185920f242 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -48,8 +48,9 @@
 private final Map endOffsets = new HashMap<>();
 private final Map> partitionInfo = new 
HashMap<>();
 private final Map stateRestorers = new 
HashMap<>();
-private final Map needsRestoring = new 
HashMap<>();
-private final Map needsInitializing = new 
HashMap<>();
+private final Set needsRestoring = new HashSet<>();
+private final Set needsInitializing = new HashSet<>();
+private final Set completedRestorers = new HashSet<>();
 private final Duration pollTime;
 
 public StoreChangelogReader(final Consumer restoreConsumer,
@@ -64,9 +65,14 @@ public StoreChangelogReader(final Consumer 
restoreConsumer,
 
 @Override
 public void register(final StateRestorer restorer) {
-restorer.setUserRestoreListener(userStateRestoreListener);
-stateRestorers.put(restorer.partition(), restorer);
-needsInitializing.put(restorer.partition(), restorer);
+if (!stateRestorers.containsKey(restorer.partition())) {
+restorer.setUserRestoreListener(userStateRestoreListener);
+stateRestorers.put(restorer.partition(), restorer);
+
+log.trace("Added restorer for changelog {}", restorer.partition());
+}
+
+needsInitializing.add(restorer.partition());
 }
 
 public Collection restore(final RestoringTasks active) {
@@ -81,16 +87,15 @@ public void register(final StateRestorer restorer) {
 
 try {
 final ConsumerRecords records = 
restoreConsumer.poll(pollTime);
-final Iterator iterator = 
needsRestoring.keySet().iterator();
-while (iterator.hasNext()) {
-final TopicPartition partition = iterator.next();
+
+for (final TopicPartition partition : needsRestoring) {
 final StateRestorer restorer = stateRestorers.get(partition);
 final long pos = processNext(records.records(partition), 
restorer, endOffsets.get(partition));
 restorer.setRestoredOffset(pos);
 if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
 restorer.restoreDone();
 endOffsets.remove(partition);
-iterator.remove();
+ 

[jira] [Commented] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-07-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560409#comment-16560409
 ] 

Guozhang Wang commented on KAFKA-7190:
--

What you described looks reasonable to me. I'd like to have [~hachikuji] also 
chime in here since he's originally implemented this logic and can shed some 
light as well.

> Under low traffic conditions purging repartition topics cause WARN statements 
> about  UNKNOWN_PRODUCER_ID 
> -
>
> Key: KAFKA-7190
> URL: https://issues.apache.org/jira/browse/KAFKA-7190
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, streams
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bill Bejeck
>Assignee: lambdaliu
>Priority: Major
>
> When a streams application has little traffic, then it is possible that 
> consumer purging would delete
> even the last message sent by a producer (i.e., all the messages sent by
> this producer have been consumed and committed), and as a result, the broker
> would delete that producer's ID. The next time when this producer tries to
> send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
> this error is retriable: the producer would just get a new producer id and
> retries, and then this time it will succeed. 
>  
> Possible fixes could be on the broker side, i.e., delaying the deletion of 
> the produderIDs for a more extended period or on the streams side developing 
> a more conservative approach to deleting offsets from repartition topics
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7213) NullPointerException during state restoration in kafka streams

2018-07-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560405#comment-16560405
 ] 

Guozhang Wang commented on KAFKA-7213:
--

Thanks for reporting this issue [~abhishek.agarwal], before you start I'd 
suggest maybe checking out the latest trunk (would be released as 2.0 soon) and 
see if this issue has been resolved.

> NullPointerException during state restoration in kafka streams
> --
>
> Key: KAFKA-7213
> URL: https://issues.apache.org/jira/browse/KAFKA-7213
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Abhishek Agarwal
>Assignee: Abhishek Agarwal
>Priority: Major
>
> I had written a custom state store which has a batch restoration callback 
> registered. What I have observed, when multiple consumer instances are 
> restarted, the application keeps failing with NullPointerException. The stack 
> trace is 
> {noformat}
> java.lang.NullPointerException: null
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.putAll(RocksDBStore.java:351)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore.putAll(RocksDBSlotKeyValueBytesStore.java:100)
>  ~[streams-core-1.0.0.297.jar:?]
>   at 
> org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore$SlotKeyValueBatchRestoreCallback.restoreAll(RocksDBSlotKeyValueBytesStore.java:303)
>  ~[streams-core-1.0.0.297.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreAll(CompositeRestoreListener.java:89)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:75)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:277)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:238)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>  ~[kafka-streams-1.0.0.jar:?]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
>  ~[kafka-streams-1.0.0.jar:?]
> {noformat}
> The faulty line in question is 
> {noformat}
> db.write(wOptions, batch);
> {noformat}
> in RocksDBStore.java which would mean that db variable is null. Probably the 
> store has been closed and restoration is still being done on it. After going 
> through the code, I think the problem is when state transitions from 
> PARTITIONS_ASSIGNED to PARTITIONS_REVOKED and restoration is still in 
> progress. 
> In such state transition, while the active tasks themselves are closed, the 
> changelog reader is not reset. It tries to restore the tasks that have 
> already been closed, db is null and results in NPE. 
> I will put in a fix to see if that fixes the issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6690) Priorities for Source Topics

2018-07-27 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560402#comment-16560402
 ] 

Guozhang Wang commented on KAFKA-6690:
--

If we add it on the consumer API, then Streams may be automatically geared with 
this feature since it is leveraging on consumers (some different considerata 
would be done on Streams since today its messaging choosing is purely dependent 
on timestamp synchronization, but this can be deferred to a follow-up 
discussion).

Maybe you can start the discussion thread on the mailing list and ask for the 
community's opinion on this feature request.

> Priorities for Source Topics
> 
>
> Key: KAFKA-6690
> URL: https://issues.apache.org/jira/browse/KAFKA-6690
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bala Prassanna I
>Assignee: Nick Afshartous
>Priority: Major
>
> We often encounter use cases where we need to prioritise source topics. If a 
> consumer is listening more than one topic, say, HighPriorityTopic and 
> LowPriorityTopic, it should consume events from LowPriorityTopic only when 
> all the events from HighPriorityTopic are consumed. This is needed in Kafka 
> Streams processor topologies as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-07-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560326#comment-16560326
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

guozhangwang opened a new pull request #5430: KAFKA-7192 Follow-up: update 
checkpoint to the reset beginning offset
URL: https://github.com/apache/kafka/pull/5430
 
 
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.1
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-27 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560143#comment-16560143
 ] 

Yogesh BG commented on KAFKA-7209:
--

offsets.topic.replication.factor set to 1 then also i receive something like 
below

 

{{ Received GroupCoordinator response 
ClientResponse(receivedTimeMs=1532716985393, latencyMs=15, disconnected=false, 
requestHeader=\{api_key=10,api_version=1,correlation_id=6157,client_id=ks_0_inst_THUNDER_METRICS-StreamThread-37-consumer},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))) for group 
aggregation-framework030_THUNDER_METRICS}}{{18:43:05.394 
[ks_0_inst_THUNDER_LOG_L4-StreamThread-70] DEBUG 
o.a.k.c.consumer.internals.Fetcher - Fetch READ_UNCOMMITTED at offset 0 for 
partition THUNDER_LOG_L4_PC-17 returned fetch data (error=NONE, 
highWaterMark=0, lastStableOffset = -1, logStartOffset = 0, abortedTransactions 
= null, recordsSizeInBytes=0)}}

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5407) Mirrormaker dont start after upgrade

2018-07-27 Thread Fernando Vega (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560099#comment-16560099
 ] 

Fernando Vega edited comment on KAFKA-5407 at 7/27/18 6:31 PM:
---

[~omkreddy] [~hachikuji] [~huxi_2b]
 Just double checking. I try this again and I found a few things:

 

a- Once I upgraded the cluster, I attempted to use the new consumer file again 
for the mirrormakers  we have whitelisting the same topics and I get the same 
exception.

b- However I did another test, using same exact configs that the production 
topics used the only difference was I created a single topic in order to check 
if the issue was something related with Kafka or the package installed. I was 
able to mirror my dummy messages using all new files and configs that we have 
for production, and it worked just fine. But with the current production topics 
it doesn't

c- Also we have seeing that sometimes the mirrormaker threads die with no 
reason, I see messages in the logs where it states that the mirrormaker was 
shutdown successfully, however we haven't stop them or restart them in order to 
see this message.

d- sometimes when we use consumer group scrip to check the lag of consumption 
we see the list of the topic and its consumers, but in some cases when we 
display the information we see the topics not having consumers, so what we do 
is stop mm remove the consumer group and start the mm and that seem to fix it.

if you guys can provide any suggestion that will be great, also any tool that 
you guys suggest that we can use to check, monitor or understand 
troubleshooting this behavior will be great as well.

Listed below are current configs:
{noformat}

###
### This file is managed by Puppet.
###

# See http://kafka.apache.org/documentation.html#brokerconfigs for default 
values.

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=31

# The port the socket server listens on
port=9092

# A comma seperated list of directories under which to store log files
log.dirs=/kafka1/datalog,/kafka2/datalog,/kafka3/datalog,/kafka4/datalog,/kafka5/datalog,/kafka6/datalog,/kafka7/datalog,/kafka8/datalog,/kafka9/datalog,/kafka10/datalog

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.

zookeeper.connect=zookeeper1-repl:2181,zookeeper2-repl:2181,zookeeper3-repl:2181,zookeeper4-repl:2181,zookeeper5-repl:2181/replication/kafka
 # Additional configuration options may follow here
auto.leader.rebalance.enable=true
delete.topic.enable=true
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
default.replication.factor=2
auto.create.topics.enable=true
num.partitions=1
num.network.threads=8
num.io.threads=40
log.retention.hours=1
log.roll.hours=1
num.replica.fetchers=8
zookeeper.connection.timeout.ms=3
zookeeper.session.timeout.ms=3
inter.broker.protocol.version=0.10.2
log.message.format.version=0.8.2

{noformat}

Producer
{noformat}
# Producer
# sjc2
bootstrap.servers=app454.sjc2.com:9092,app455.sjc2.com:9092,app456.sjc2.com:9092,app457.sjc2.com:9092,app458.sjc2.com:9092,app459.sjc2.com:9092

# Producer Configurations
acks=0
buffer.memory=67108864
compression.type=gzip
linger.ms=10
reconnect.backoff.ms=100
request.timeout.ms=12
retry.backoff.ms=1000
{noformat}

Consumer
{noformat}
bootstrap.servers=app043.atl2.com:9092,app044.atl2.com:9092,app045.atl2.com:9092,app046.atl2.com:9092,app047.atl2.com:9092,app048.atl2.com:9092

group.id=MirrorMaker_atl1

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

receive.buffer.bytes=1048576
send.buffer.bytes=1048576

session.timeout.ms=25

key.deserializer=org.apache.kafka.common.serialization.Deserializer
value.deserializer=org.apache.kafka.common.serialization.Deserializer

{noformat}


was (Author: fvegaucr):
[~omkreddy] [~hachikuji] [~huxi_2b]
Just double checking. I try this again and I found a few things:

 

a- Once I upgraded the cluster, I attempted to use the new consumer file again 
for the mirrormakers  we have whitelisting the same topics and I get the same 
exception. 

b- However I did another test, using same exact configs that the production 
topics used the only difference was I created a single topic in order to check 
if the issue was something realted with Kafka or the package installed and 
mirroring using all new files, and it worked just fine. But with the current 
production topics it doesnt?

c- Also we have seeing that sometimes the mirrormaker threads die with no 
reason, I see messages in the logs where it states that the mirrormaker was 
shutdown successfully, however we havent stop them or restart them 

[jira] [Commented] (KAFKA-5407) Mirrormaker dont start after upgrade

2018-07-27 Thread Fernando Vega (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560099#comment-16560099
 ] 

Fernando Vega commented on KAFKA-5407:
--

[~omkreddy] [~hachikuji] [~huxi_2b]
Just double checking. I try this again and I found a few things:

 

a- Once I upgraded the cluster, I attempted to use the new consumer file again 
for the mirrormakers  we have whitelisting the same topics and I get the same 
exception. 

b- However I did another test, using same exact configs that the production 
topics used the only difference was I created a single topic in order to check 
if the issue was something realted with Kafka or the package installed and 
mirroring using all new files, and it worked just fine. But with the current 
production topics it doesnt?

c- Also we have seeing that sometimes the mirrormaker threads die with no 
reason, I see messages in the logs where it states that the mirrormaker was 
shutdown successfully, however we havent stop them or restart them in order to 
see this message.

d- sometimes when we use consumer group scrip to check the LAG of consumption 
we see the list of the topic and its consumers, but in some cases when we 
display the information we see the topics not having consumers, so what we do 
is stop mm remove the consumer group and start the mm and that seem to fix it.



if you guys can provide any suggestion that will be great, also any tool that 
you guys suggest that we can use to check, monitor or understand 
troubleshooting this behavior will be great as well.

> Mirrormaker dont start after upgrade
> 
>
> Key: KAFKA-5407
> URL: https://issues.apache.org/jira/browse/KAFKA-5407
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.1
> Environment: Operating system
> CentOS 6.8
> HW
> Board Mfg : HP
>  Board Product : ProLiant DL380p Gen8
> CPU's x2
> Product Manufacturer  : Intel
>  Product Name  :  Intel(R) Xeon(R) CPU E5-2660 v2 @ 2.20GHz
>  Memory Type   : DDR3 SDRAM
>  SDRAM Capacity: 2048 MB
>  Total Memory: : 64GB
> Hardrives size and layout:
> 9 drives using jbod
> drive size 3.6TB each
>Reporter: Fernando Vega
>Priority: Critical
> Attachments: broker.hkg1.new, debug.hkg1.new, 
> mirrormaker-repl-sjc2-to-hkg1.log.8
>
>
> Currently Im upgrading the cluster from 0.8.2-beta to 0.10.2.1
> So I followed the rolling procedure:
> Here the config files:
> Consumer
> {noformat}
> #
> # Cluster: repl
> # Topic list(goes into command line): 
> REPL-ams1-global,REPL-atl1-global,REPL-sjc2-global,REPL-ams1-global-PN_HXIDMAP_.*,REPL-atl1-global-PN_HXIDMAP_.*,REPL-sjc2-global-PN_HXIDMAP_.*,REPL-ams1-global-PN_HXCONTEXTUALV2_.*,REPL-atl1-global-PN_HXCONTEXTUALV2_.*,REPL-sjc2-global-PN_HXCONTEXTUALV2_.*
> bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092
> group.id=hkg1_cluster
> auto.commit.interval.ms=6
> partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
> {noformat}
> Producer
> {noformat}
>  hkg1
> # # Producer
> # # hkg1
> bootstrap.servers=app001:9092,app002:9092,app003:9092,app004:9092
> compression.type=gzip
> acks=0
> {noformat}
> Broker
> {noformat}
> auto.leader.rebalance.enable=true
> delete.topic.enable=true
> socket.receive.buffer.bytes=1048576
> socket.send.buffer.bytes=1048576
> default.replication.factor=2
> auto.create.topics.enable=true
> num.partitions=1
> num.network.threads=8
> num.io.threads=40
> log.retention.hours=1
> log.roll.hours=1
> num.replica.fetchers=8
> zookeeper.connection.timeout.ms=3
> zookeeper.session.timeout.ms=3
> inter.broker.protocol.version=0.10.2
> log.message.format.version=0.8.2
> {noformat}
> I tried also using stock configuraiton with no luck.
> The error that I get is this:
> {noformat}
> 2017-06-07 12:24:45,476] INFO ConsumerConfig values:
>   auto.commit.interval.ms = 6
>   auto.offset.reset = latest
>   bootstrap.servers = [app454.sjc2.mytest.com:9092, 
> app455.sjc2.mytest.com:9092, app456.sjc2.mytest.com:9092, 
> app457.sjc2.mytest.com:9092, app458.sjc2.mytest.com:9092, 
> app459.sjc2.mytest.com:9092]
>   check.crcs = true
>   client.id = MirrorMaker_hkg1-1
>   connections.max.idle.ms = 54
>   enable.auto.commit = false
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = MirrorMaker_hkg1
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   

[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-27 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559935#comment-16559935
 ] 

Yogesh BG commented on KAFKA-7209:
--

Hi can u suggest me anything am missing, we are blocked for our product release 
due to this bug... is there any way i safely clean the kstrems and restart them 
with the same application.id??? during thins process some amount of data loss 
is also fine...

or either confirmation that its a bug in streaming app could help me taking 
some decision abt what alternative restart process i can do...

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7213) NullPointerException during state restoration in kafka streams

2018-07-27 Thread Abhishek Agarwal (JIRA)
Abhishek Agarwal created KAFKA-7213:
---

 Summary: NullPointerException during state restoration in kafka 
streams
 Key: KAFKA-7213
 URL: https://issues.apache.org/jira/browse/KAFKA-7213
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Abhishek Agarwal
Assignee: Abhishek Agarwal


I had written a custom state store which has a batch restoration callback 
registered. What I have observed, when multiple consumer instances are 
restarted, the application keeps failing with NullPointerException. The stack 
trace is 
{noformat}
java.lang.NullPointerException: null
at 
org.apache.kafka.streams.state.internals.RocksDBStore.putAll(RocksDBStore.java:351)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore.putAll(RocksDBSlotKeyValueBytesStore.java:100)
 ~[streams-core-1.0.0.297.jar:?]
at 
org.apache.kafka.streams.state.internals.RocksDBSlotKeyValueBytesStore$SlotKeyValueBatchRestoreCallback.restoreAll(RocksDBSlotKeyValueBytesStore.java:303)
 ~[streams-core-1.0.0.297.jar:?]
at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreAll(CompositeRestoreListener.java:89)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:75)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:277)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restorePartition(StoreChangelogReader.java:238)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:263)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:803)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
 ~[kafka-streams-1.0.0.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
 ~[kafka-streams-1.0.0.jar:?]
{noformat}

The faulty line in question is 
{noformat}
db.write(wOptions, batch);
{noformat}

in RocksDBStore.java which would mean that db variable is null. Probably the 
store has been closed and restoration is still being done on it. After going 
through the code, I think the problem is when state transitions from 
PARTITIONS_ASSIGNED to PARTITIONS_REVOKED and restoration is still in progress. 
In such state transition, while the active tasks themselves are closed, the 
changelog reader is not reset. It tries to restore the tasks that have already 
been closed, db is null and results in NPE. 

I will put in a fix to see if that fixes the issue. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7209) Kafka stream does not rebalance when one node gets down

2018-07-27 Thread Yogesh BG (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559492#comment-16559492
 ] 

Yogesh BG commented on KAFKA-7209:
--

I tried setting these configuration and try, but no luck

 

{{conf.put("retries",Integer.MAX_VALUE);}}

{{conf.put("rebalance.max.retries",Integer.MAX_VALUE);}}

{{conf.put("zookeeper.session.timeout.ms",1000);}}

> Kafka stream does not rebalance when one node gets down
> ---
>
> Key: KAFKA-7209
> URL: https://issues.apache.org/jira/browse/KAFKA-7209
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Yogesh BG
>Priority: Critical
>
> I use kafka streams 0.11.0.1session timeout 6, retries to be int.max and 
> backoff time default
>  
> I have 3 nodes running kafka cluster of 3 broker
> and i am running the 3 kafka stream with same 
> [application.id|http://application.id/]
> each node has one broker one kafka stream application
> everything works fine during setup
> i bringdown one node, so one kafka broker and one streaming app is down
> now i see exceptions in other two streaming apps and it never gets re 
> balanced waited for hours and never comes back to norma
> is there anything am missing?
> i also tried looking into when one broker is down call stream.close, cleanup 
> and restart this also doesn't help
> can anyone help me?
>  
>  
>  
>  One thing i observed lately is that kafka topics with partitions one gets 
> reassigned but i have topics of 16 partitions and replication factor 3. It 
> never settles up



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7212) Bad exception message on failed serialization

2018-07-27 Thread D T (JIRA)
D T created KAFKA-7212:
--

 Summary: Bad exception message on failed serialization
 Key: KAFKA-7212
 URL: https://issues.apache.org/jira/browse/KAFKA-7212
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Affects Versions: 1.1.1, 1.0.1
Reporter: D T


I use Spring-Kafka to connect to a Kafka-Server. While trying to use Spring's 
MessageConverter I encountered strange error messages that did not make any 
sense for me.

 

 
{noformat}
org.apache.kafka.common.errors.SerializationException: Can't convert value of 
class org.springframework.messaging.support.GenericMessage to class 
org.apache.kafka.common.serialization.ByteArraySerializer specified in 
value.serializer
Caused by: java.lang.ClassCastException: 
org.springframework.messaging.support.GenericMessage cannot be cast to [B
    at 
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at 
org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at 
org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:791)
    at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
    at 
org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:285)
    at 
org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:349)
    at 
org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:182){noformat}
My question was why would Kafka try to cast/convert Spring's GenericMessage to 
Kafka's ByteArraySerializer?

After quite some time trying various config options I debugged the code and 
found that the exception message was just bad.

The message should be something like
{noformat}
Can't convert value of class 
org.springframework.messaging.support.GenericMessage to byte[] in class 
org.apache.kafka.common.serialization.ByteArraySerializer specified in 
value.serializer{noformat}
 The issue is caused by line:
[https://github.com/apache/kafka/blob/1.1.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L801]

and
[https://github.com/apache/kafka/blob/1.1.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L809]

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6690) Priorities for Source Topics

2018-07-27 Thread Bala Prassanna I (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559356#comment-16559356
 ] 

Bala Prassanna I commented on KAFKA-6690:
-

[~guozhang] We would need this in both Consumer API and also in Streams API

> Priorities for Source Topics
> 
>
> Key: KAFKA-6690
> URL: https://issues.apache.org/jira/browse/KAFKA-6690
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bala Prassanna I
>Assignee: Nick Afshartous
>Priority: Major
>
> We often encounter use cases where we need to prioritise source topics. If a 
> consumer is listening more than one topic, say, HighPriorityTopic and 
> LowPriorityTopic, it should consume events from LowPriorityTopic only when 
> all the events from HighPriorityTopic are consumed. This is needed in Kafka 
> Streams processor topologies as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6690) Priorities for Source Topics

2018-07-27 Thread Bala Prassanna I (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559356#comment-16559356
 ] 

Bala Prassanna I edited comment on KAFKA-6690 at 7/27/18 7:08 AM:
--

[~guozhang] We would need this in both Consumer API and Streams API


was (Author: balaprassanna):
[~guozhang] We would need this in both Consumer API and also in Streams API

> Priorities for Source Topics
> 
>
> Key: KAFKA-6690
> URL: https://issues.apache.org/jira/browse/KAFKA-6690
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Bala Prassanna I
>Assignee: Nick Afshartous
>Priority: Major
>
> We often encounter use cases where we need to prioritise source topics. If a 
> consumer is listening more than one topic, say, HighPriorityTopic and 
> LowPriorityTopic, it should consume events from LowPriorityTopic only when 
> all the events from HighPriorityTopic are consumed. This is needed in Kafka 
> Streams processor topologies as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7134) KafkaLog4jAppender - Appender exceptions are propagated to caller

2018-07-27 Thread Andras Katona (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559353#comment-16559353
 ] 

Andras Katona commented on KAFKA-7134:
--

When using kafka appender, logging from org.apache.kafka.* packages should be 
disabled.

> KafkaLog4jAppender - Appender exceptions are propagated to caller
> -
>
> Key: KAFKA-7134
> URL: https://issues.apache.org/jira/browse/KAFKA-7134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: venkata praveen
>Assignee: Andras Katona
>Priority: Major
>
> KafkaLog4jAppender exceptions are propagated to caller when Kafka is 
> down/slow/other, it may cause the application crash. Ideally appender should 
> print and ignore the exception
>  or should provide option to ignore/throw the exceptions like 
> 'ignoreExceptions' property of 
> https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)