[jira] [Assigned] (KAFKA-5245) KStream builder should capture serdes

2017-07-03 Thread anugrah (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

anugrah reassigned KAFKA-5245:
--

Assignee: Evgeny Veretennikov

> KStream builder should capture serdes 
> --
>
> Key: KAFKA-5245
> URL: https://issues.apache.org/jira/browse/KAFKA-5245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1
>Reporter: Yeva Byzek
>Assignee: Evgeny Veretennikov
>Priority: Minor
>  Labels: needs-kip
>
> Even if one specifies a serdes in `builder.stream`, later a call to 
> `groupByKey` may require the serdes again if it differs from the configured 
> streams app serdes. The preferred behavior is that if no serdes is provided 
> to `groupByKey`, it should use whatever was provided in `builder.stream` and 
> not what was in the app.
> From the current docs:
> “When to set explicit serdes: Variants of groupByKey exist to override the 
> configured default serdes of your application, which you must do if the key 
> and/or value types of the resulting KGroupedStream do not match the 
> configured default serdes.”



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5245) KStream builder should capture serdes

2017-07-03 Thread anugrah (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

anugrah reassigned KAFKA-5245:
--

Assignee: (was: anugrah)

> KStream builder should capture serdes 
> --
>
> Key: KAFKA-5245
> URL: https://issues.apache.org/jira/browse/KAFKA-5245
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0, 0.10.2.1
>Reporter: Yeva Byzek
>Priority: Minor
>  Labels: needs-kip
>
> Even if one specifies a serdes in `builder.stream`, later a call to 
> `groupByKey` may require the serdes again if it differs from the configured 
> streams app serdes. The preferred behavior is that if no serdes is provided 
> to `groupByKey`, it should use whatever was provided in `builder.stream` and 
> not what was in the app.
> From the current docs:
> “When to set explicit serdes: Variants of groupByKey exist to override the 
> configured default serdes of your application, which you must do if the key 
> and/or value types of the resulting KGroupedStream do not match the 
> configured default serdes.”



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-03 Thread Nishkam Ravi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nishkam Ravi resolved KAFKA-5528.
-
Resolution: Fixed

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-03 Thread Nishkam Ravi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072997#comment-16072997
 ] 

Nishkam Ravi commented on KAFKA-5528:
-

Thanks [~mjsax] for the pointer! That seems to have helped (will do some more 
testing to confirm). Yeah, adding to the documentation/FAQ would be a good idea.

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-2260) Allow specifying expected offset on produce

2017-07-03 Thread Sven Linstaedt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072735#comment-16072735
 ] 

Sven Linstaedt commented on KAFKA-2260:
---

With KAFKA-4815 being released I still try to figure out, how to get 
application-level idempotency for sent records. Setting a system-wide 
_transactional.id_ for all producers will cause a very high contention, even 
though one is committing each and every single produce request. Contention will 
be better by setting a topic- or even partition-specific _transactional.id_, 
but as the configuration setting is per producer and not per sent record, one 
has the initialize dozens or even hundreds of producers. Whatever you choose... 
it's does sound like being optimal in any way.

Any other ideas?

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Kirwin
>Priority: Minor
> Attachments: expected-offsets.patch, KAFKA-2260.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-1120) Controller could miss a broker state change

2017-07-03 Thread Andrew Olson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072663#comment-16072663
 ] 

Andrew Olson commented on KAFKA-1120:
-

[~wushujames] Can you retest with Kafka 0.11 to see if KAFKA-1211 resolves this 
problem?

> Controller could miss a broker state change 
> 
>
> Key: KAFKA-1120
> URL: https://issues.apache.org/jira/browse/KAFKA-1120
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.1
>Reporter: Jun Rao
>  Labels: reliability
>
> When the controller is in the middle of processing a task (e.g., preferred 
> leader election, broker change), it holds a controller lock. During this 
> time, a broker could have de-registered and re-registered itself in ZK. After 
> the controller finishes processing the current task, it will start processing 
> the logic in the broker change listener. However, it will see no broker 
> change and therefore won't do anything to the restarted broker. This broker 
> will be in a weird state since the controller doesn't inform it to become the 
> leader of any partition. Yet, the cached metadata in other brokers could 
> still list that broker as the leader for some partitions. Client requests 
> routed to that broker will then get a TopicOrPartitionNotExistException. This 
> broker will continue to be in this bad state until it's restarted again.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Yogesh BG (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yogesh BG updated KAFKA-5545:
-
Attachment: kafkastreams.log

I have attached the file.


Whenever i get these debug log exceptions kstream is not able to process further

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
> Attachments: kafkastreams.log
>
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method

2017-07-03 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072622#comment-16072622
 ] 

Matthias J. Sax commented on KAFKA-5528:


[~nravi] I guess the problem is, that you are using a single instance of 
{{GenericProcessor}}. In {{addProcessor}} you return the same object on each 
call, but you need to return a new instance each time. Can you try to change to 
{{addProcessor("PROCESS", () => new GenericProcessor[T](serDe, decrypt, 
config), "SOURCE")}} ? Let us know if this fixes your problem or not.

We see this question regularly lately. Going to add an FAQ :)

> Error while reading topic, offset, partition info from process method
> -
>
> Key: KAFKA-5528
> URL: https://issues.apache.org/jira/browse/KAFKA-5528
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Nishkam Ravi
>
> We are encountering an {{IllegalStateException}} while trying to access 
> {{context.topic()}} from process function. The code is written in Scala and 
> is being launched using sbt (spring isn't involved). Here's the code sketch:
> {noformat}
> class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], 
> decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], 
> Array[Byte]] with LazyLogging {
>   private var hsmClient: HSMClient = _
>   override def init(processorContext: ProcessorContext): Unit = { 
> super.init(processorContext) 
> hsmClient = HSMClient(config).getOrElse(null) 
>   }
>   override def process(key: Array[Byte], value: Array[Byte]): Unit = { 
> val topic: String = this.context.topic() 
> partition: Int = this.context.partition() 
> val offset: Long = this.context.offset() 
> val timestamp: Long = this.context.timestamp() 
> // business logic 
>   }
> }
> {noformat}
> The exception is thrown only for the multi-consumer case (when number of 
> partitions for a topic > 1 and parallelism > 1). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5530) Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)

2017-07-03 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072596#comment-16072596
 ] 

Matthias J. Sax commented on KAFKA-5530:


Glad you could resolve your problem. We did change the default of 
{{max.poll.interval.ms}} to infinite (ie, {{Integer.MAX_VALUE}} already. And we 
are working on some more improvements so Streams should be more robust if the 
value is small, too (cf. https://issues.apache.org/jira/browse/KAFKA-5152)

> Balancer is dancing with KStream all the time, and due to that Kafka cannot 
> work :-)
> 
>
> Key: KAFKA-5530
> URL: https://issues.apache.org/jira/browse/KAFKA-5530
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
> Environment: Linux, Windows
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: streamer_20-topics_1-thread-K-0.11.0.0.log.zip, 
> streamer_20-topics_4-threads-K-0.11.0.0.log.zip, 
> streamer_2-topics_1-thread-K-0.11.0.0.log.zip, 
> streamer_2-topics_4_threads-K-0.11.0.0.log.zip
>
>
> Dears,
> There are problems with balancer in KStreams (v. 0.10.2.x), when 
> _num.stream.threads_ is bigger than 1 and the number of the input topics are 
> bigger than 1.
> I am doing more less such a setup in the code:
> {code:java}
> // loop over the inTopicName(s) {
> KStream stringInput = kBuilder.stream( STRING_SERDE, 
> STRING_SERDE, inTopicName );
> stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer 
> ).to( outTopicName );
> // } end of loop
> streams = new KafkaStreams( kBuilder, streamsConfig );
> streams.cleanUp();
> streams.start();
> {code}
> And if there are *_num.stream.threads=4_* but there are 2 or more but less 
> than num.stream.threads inTopicNames, then complete application startup is 
> totally self-blocked, by writing endless starnge things in log and not 
> starting.
> Even more problematic is when the nuber of topics is higher than 
> _num.stream.threads_ what I had commented in *KAFKA-5167 streams task gets 
> stuck after re-balance due to LockException*.
> I am attaching logs for two scenarios:
>  * when: 1 < num.stream.threads < numer of topics (KAFKA-5167)
>  * when: 1 < numer of topics < num.stream.threads (this ticket).
> I can fully reproduce the behaviour. Even I found workaround for it, but is 
> not desired. When _num.stream.threads=1_ than all works fine :-( (for K v. 
> 0.10.2.x, v. 0.11.0.0 does not work at all).
> {code:bash}
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-3] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 2.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread 
> [StreamThread-1] Constructed client metadata 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, 
> consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-ab798efe-16a6-4686-bdee-ccd50c937cd7],
>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
> subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-1] Completed validating internal topics in partition 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072587#comment-16072587
 ] 

Matthias J. Sax commented on KAFKA-5545:


Can you do me a favor and attach the log file instead of c into the comments? 
I quite hard to keep an overview. Thanks a lot!

> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-37] Creating active task 0_3 with assigned 
> partitions [PR-3]
> 11:04:08.036 [StreamThread-45] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-45] Creating active task 0_0 with assigned 
> partitions [PR-0]
> 11:04:08.037 [StreamThread-36] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-36] Creating active task 0_4 with assigned 
> partitions [PR-4]
> 11:04:08.037 [StreamThread-43] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-43] Creating active task 0_6 with assigned 
> partitions [PR-6]
> 11:04:08.038 [StreamThread-48] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-48] Creating active task 0_2 with assigned 
> partitions [PR-2]
> 11:04:09.034 [StreamThread-38] WARN  o.a.k.s.p.internals.StreamThread - Could 
> not create task 0_5. Will retry.
> org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the 
> state directory for task 0_5
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Comment Edited] (KAFKA-5530) Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)

2017-07-03 Thread Seweryn Habdank-Wojewodzki (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072421#comment-16072421
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-5530 at 7/3/17 1:15 PM:
---

The main problem, at least what we had observed at the end, was that our 
_max.poll.interval.ms_ was simply *_too_* small.

Currently we set: _max.poll.interval.ms_=100 and Kafka Stream (consuming 
one) is starting properly.

Perhaps it would be good to have some hint in documentation, that 
_max.poll.interval.ms_ should not be too small, as it will cause endless 
rebalancing. 

The implicit explanation is here:
If poll() is not called before expiration of this timeout, then the consumer is 
considered failed and the group will rebalance in order to reassign the 
partitions to another member. 

But explicitely it is not stated, that max.poll.interval.ms shall be somewhat 
big :-).


was (Author: habdank):
The main problem, at least what we had observed at the end, was that our 
max.poll.interval.ms was simply *_too_* small.

Currently we set: max.poll.interval.ms=100 and Kafka Stream (consuming one) 
is starting properly.

Perhaps it would be good to have some hint in documentation, that 
max.poll.interval.ms should not be too small, as it will cause endless 
rebalancing. 

The implicit explanation is here:
If poll() is not called before expiration of this timeout, then the consumer is 
considered failed and the group will rebalance in order to reassign the 
partitions to another member. 

But explicitely it is not stated, that max.poll.interval.ms shall be somewhat 
big :-).

> Balancer is dancing with KStream all the time, and due to that Kafka cannot 
> work :-)
> 
>
> Key: KAFKA-5530
> URL: https://issues.apache.org/jira/browse/KAFKA-5530
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
> Environment: Linux, Windows
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: streamer_20-topics_1-thread-K-0.11.0.0.log.zip, 
> streamer_20-topics_4-threads-K-0.11.0.0.log.zip, 
> streamer_2-topics_1-thread-K-0.11.0.0.log.zip, 
> streamer_2-topics_4_threads-K-0.11.0.0.log.zip
>
>
> Dears,
> There are problems with balancer in KStreams (v. 0.10.2.x), when 
> _num.stream.threads_ is bigger than 1 and the number of the input topics are 
> bigger than 1.
> I am doing more less such a setup in the code:
> {code:java}
> // loop over the inTopicName(s) {
> KStream stringInput = kBuilder.stream( STRING_SERDE, 
> STRING_SERDE, inTopicName );
> stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer 
> ).to( outTopicName );
> // } end of loop
> streams = new KafkaStreams( kBuilder, streamsConfig );
> streams.cleanUp();
> streams.start();
> {code}
> And if there are *_num.stream.threads=4_* but there are 2 or more but less 
> than num.stream.threads inTopicNames, then complete application startup is 
> totally self-blocked, by writing endless starnge things in log and not 
> starting.
> Even more problematic is when the nuber of topics is higher than 
> _num.stream.threads_ what I had commented in *KAFKA-5167 streams task gets 
> stuck after re-balance due to LockException*.
> I am attaching logs for two scenarios:
>  * when: 1 < num.stream.threads < numer of topics (KAFKA-5167)
>  * when: 1 < numer of topics < num.stream.threads (this ticket).
> I can fully reproduce the behaviour. Even I found workaround for it, but is 
> not desired. When _num.stream.threads=1_ than all works fine :-( (for K v. 
> 0.10.2.x, v. 0.11.0.0 does not work at all).
> {code:bash}
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-3] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 2.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 

[jira] [Resolved] (KAFKA-5530) Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)

2017-07-03 Thread Seweryn Habdank-Wojewodzki (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki resolved KAFKA-5530.
---
Resolution: Not A Bug

The main problem, at least what we had observed at the end, was that our was 
simply *_too_* small.

Currently we set: max.poll.interval.ms=100 and Kafka Stream (consuming one) 
is starting properly.

Perhaps it would be good to have some hint in documentation, that 
max.poll.interval.ms should not be too small, as it will cause endless 
rebalancing. 

The implicit explanation is here:
If poll() is not called before expiration of this timeout, then the consumer is 
considered failed and the group will rebalance in order to reassign the 
partitions to another member. 

But explicitely it is not stated, that max.poll.interval.ms shall be somewhat 
big :-).

> Balancer is dancing with KStream all the time, and due to that Kafka cannot 
> work :-)
> 
>
> Key: KAFKA-5530
> URL: https://issues.apache.org/jira/browse/KAFKA-5530
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
> Environment: Linux, Windows
>Reporter: Seweryn Habdank-Wojewodzki
> Attachments: streamer_20-topics_1-thread-K-0.11.0.0.log.zip, 
> streamer_20-topics_4-threads-K-0.11.0.0.log.zip, 
> streamer_2-topics_1-thread-K-0.11.0.0.log.zip, 
> streamer_2-topics_4_threads-K-0.11.0.0.log.zip
>
>
> Dears,
> There are problems with balancer in KStreams (v. 0.10.2.x), when 
> _num.stream.threads_ is bigger than 1 and the number of the input topics are 
> bigger than 1.
> I am doing more less such a setup in the code:
> {code:java}
> // loop over the inTopicName(s) {
> KStream stringInput = kBuilder.stream( STRING_SERDE, 
> STRING_SERDE, inTopicName );
> stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer 
> ).to( outTopicName );
> // } end of loop
> streams = new KafkaStreams( kBuilder, streamsConfig );
> streams.cleanUp();
> streams.start();
> {code}
> And if there are *_num.stream.threads=4_* but there are 2 or more but less 
> than num.stream.threads inTopicNames, then complete application startup is 
> totally self-blocked, by writing endless starnge things in log and not 
> starting.
> Even more problematic is when the nuber of topics is higher than 
> _num.stream.threads_ what I had commented in *KAFKA-5167 streams task gets 
> stuck after re-balance due to LockException*.
> I am attaching logs for two scenarios:
>  * when: 1 < num.stream.threads < numer of topics (KAFKA-5167)
>  * when: 1 < numer of topics < num.stream.threads (this ticket).
> I can fully reproduce the behaviour. Even I found workaround for it, but is 
> not desired. When _num.stream.threads=1_ than all works fine :-( (for K v. 
> 0.10.2.x, v. 0.11.0.0 does not work at all).
> {code:bash}
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-3] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 2.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread 
> [StreamThread-1] Constructed client metadata 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, 
> 

[jira] [Created] (KAFKA-5552) testTransactionalProducerTopicAuthorizationExceptionInCommit fails

2017-07-03 Thread Eno Thereska (JIRA)
Eno Thereska created KAFKA-5552:
---

 Summary: 
testTransactionalProducerTopicAuthorizationExceptionInCommit fails 
 Key: KAFKA-5552
 URL: https://issues.apache.org/jira/browse/KAFKA-5552
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.11.0.0
Reporter: Eno Thereska
 Fix For: 0.11.1.0


Got a unit test error: 
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5877/

Error Message

org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state
Stacktrace

org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:524)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginCommit(TransactionManager.java:190)
at 
org.apache.kafka.clients.producer.KafkaProducer.commitTransaction(KafkaProducer.java:583)
at 
kafka.api.AuthorizerIntegrationTest.testTransactionalProducerTopicAuthorizationExceptionInCommit(AuthorizerIntegrationTest.scala:1027)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:147)
at 

[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072228#comment-16072228
 ] 

Yogesh BG edited comment on KAFKA-5545 at 7/3/17 10:27 AM:
---

Here I found out something that if the stream got closed successfully then its 
able to re-establish the connection with new ip of the broker and process the 
data further.

But some times what happening is. Previously stream is not getting closed 
properly. Because some threads are trying to re-establish the connection to the 
old ip of broker which is not available. And keeps logging DEBUG exceptions. I 
have attached the debug log. In this situation stream is not processing the 
data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture setupDiscovery(final AbstractConfiguration 
configInstance, int refreshInterval,
final String vipAddress, final boolean useSecurePort, 
final boolean useHostNames) {
return executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
List bootstrapServers = 
getBootstrapServer(configInstance, vipAddress, useSecurePort,
useHostNames);
String oldBootstrapServerString = 
config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
logger.info("New bootstrap servers 
obtained from registry server are " + bootstrapServers
+ ", old bootstrap 
server are " + oldBootstrapServerString);
boolean isChanged = 
checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString);
if (isChanged) {
String bootstrapServerString = 
bootstrapServersStr(bootstrapServers);

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString);
logger.info(
"Closing 
connection to oldBootstrapServerString [" + oldBootstrapServerString + "].");
close();
streams = new 
KafkaStreams(buildTopology(config), config);
logger.info("cleaning up 
oldBootstrapServerString [" + oldBootstrapServerString + "].");
streams.cleanUp();
start();
logger.info("Completed restart 
of kafka streams connection to new broker with configuration "
+ config);
}
} catch (Throwable ex) {
logger.error("discovery of kafka broker 
instances failed with reason : " + ex.getMessage()
+ ", will retry again", 
ex);
}
}

}, 0, refreshInterval, TimeUnit.MINUTES);
}

{code}



was (Author: yogeshbelur):
Here I found out something that if the stream got closed successfully then its 
able to re-establish the connection with new ip of the broker and process the 
data further.

But some times what happening is. Previously stream is not getting closed 
properly. Because some threads are trying to re-establish the connection to the 
old ip of broker which is not available. And keeps logging DEBUG exceptions. I 
have attached the debug log. In this situation stream is not processing the 
data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture setupDiscovery(final AbstractConfiguration 
configInstance, int refreshInterval,
final String vipAddress, final boolean useSecurePort, 
final boolean useHostNames) {
return executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
List bootstrapServers = 
getBootstrapServer(configInstance, vipAddress, useSecurePort,
useHostNames);
String oldBootstrapServerString = 
config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
 

[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip

2017-07-03 Thread Yogesh BG (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16072228#comment-16072228
 ] 

Yogesh BG commented on KAFKA-5545:
--

Here I found out something that if the stream got closed successfully then its 
able to re-establish the connection with new ip of the broker and process the 
data further.

But some times what happening is. Previously stream is not getting closed 
properly. Because some threads are trying to re-establish the connection to the 
old ip of broker which is not available. And keeps logging DEBUG exceptions. I 
have attached the debug log. In this situation stream is not processing the 
data further.

Here is the logic used to reestablish the connection.
close timeout is 60sec


{code:java}
private ScheduledFuture setupDiscovery(final AbstractConfiguration 
configInstance, int refreshInterval,
final String vipAddress, final boolean useSecurePort, 
final boolean useHostNames) {
return executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
List bootstrapServers = 
getBootstrapServer(configInstance, vipAddress, useSecurePort,
useHostNames);
String oldBootstrapServerString = 
config.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
logger.info("New bootstrap servers 
obtained from registry server are " + bootstrapServers
+ ", old bootstrap 
server are " + oldBootstrapServerString);
boolean isChanged = 
checkForChangeInBootstrapServers(bootstrapServers, oldBootstrapServerString);
if (isChanged) {
String bootstrapServerString = 
bootstrapServersStr(bootstrapServers);

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerString);
logger.info(
"Closing 
connection to oldBootstrapServerString [" + oldBootstrapServerString + "].");
close();
streams = new 
KafkaStreams(buildTopology(config), config);
logger.info("cleaning up 
oldBootstrapServerString [" + oldBootstrapServerString + "].");
streams.cleanUp();
start();
logger.info("Completed restart 
of kafka streams connection to new broker with configuration "
+ config);
}
} catch (Throwable ex) {
logger.error("discovery of kafka broker 
instances failed with reason : " + ex.getMessage()
+ ", will retry again", 
ex);
}
}

}, 0, refreshInterval, TimeUnit.MINUTES);
}

{code}


> Kafka Stream not able to successfully restart over new broker ip
> 
>
> Key: KAFKA-5545
> URL: https://issues.apache.org/jira/browse/KAFKA-5545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Yogesh BG
>Priority: Critical
>
> Hi
> I have one kafka broker and one kafka stream application
> initially kafka stream connected and starts processing data. Then i restart 
> the broker. When broker restarts new ip will be assigned.
> In kafka stream i have a 5min interval thread which checks if broker ip 
> changed and if changed, we cleanup the stream, rebuild topology(tried with 
> reusing topology) and start the stream again. I end up with the following 
> exceptions.
> 11:04:08.032 [StreamThread-38] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-38] Creating active task 0_5 with assigned 
> partitions [PR-5]
> 11:04:08.033 [StreamThread-41] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-41] Creating active task 0_1 with assigned 
> partitions [PR-1]
> 11:04:08.036 [StreamThread-34] INFO  o.a.k.s.p.internals.StreamThread - 
> stream-thread [StreamThread-34] Creating active task 0_7 with assigned 
> partitions [PR-7]
> 11:04:08.036 [StreamThread-37] INFO  o.a.k.s.p.internals.StreamThread - 
> 

[jira] [Assigned] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2017-07-03 Thread Evgeny Veretennikov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Evgeny Veretennikov reassigned KAFKA-3539:
--

Assignee: Evgeny Veretennikov  (was: Manikumar)

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>Reporter: Oleg Zhurakousky
>Assignee: Evgeny Veretennikov
>Priority: Critical
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)