[jira] [Updated] (KAFKA-6706) NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 broker upgrade

2018-03-28 Thread Di Shang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Shang updated KAFKA-6706:

Description: 
We have 2 clusters A and B with 4 brokers each, we use mirrormaker to replicate 
topics from A to B. 
 We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade we 
started seeing the mirrormaker task showing producer errors and intermittently 
dying. 
 We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem. 
Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so we 
think it's a server side problem.

There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For 
testing, I used a topic *logging* with 20 partitions and 3 replicas (same on 
cluster A and B), the source topic has 50+ million msg.

(this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar)
{noformat}
22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 35122 on 
topic-partition logging-7, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 51572 on 
topic-partition logging-7, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 57785 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 85406 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 128047 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 137049 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 153976 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:27:57.779 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-2, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-10, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-14, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-6, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:29:12.378 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 

[jira] [Updated] (KAFKA-6706) NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 broker upgrade

2018-03-28 Thread Di Shang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Shang updated KAFKA-6706:

Description: 
We have 2 clusters A and B with 4 brokers each, we use mirrormaker to replicate 
topics from A to B. 
 We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade we 
started seeing the mirrormaker task showing producer errors and intermittently 
dying. 
 We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem. 
Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so we 
think it's a server side problem.

There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For 
testing, I used a topic *logging* with 20 partitions and 3 replicas (same on 
cluster A and B), the source topic has 50+ million msg.

(this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar)
{noformat}
22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 35122 on 
topic-partition logging-7, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 51572 on 
topic-partition logging-7, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 57785 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 85406 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 128047 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 137049 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 153976 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:27:57.779 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-2, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-10, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-14, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-6, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:29:12.378 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 

[jira] [Updated] (KAFKA-6706) NETWORK_EXCEPTION and REQUEST_TIMED_OUT in mirrormaker producer after 1.0 broker upgrade

2018-03-28 Thread Di Shang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Shang updated KAFKA-6706:

   Priority: Blocker  (was: Major)
Description: 
We have 2 clusters A and B with 4 brokers each, we use mirrormaker to replicate 
topics from A to B. 
 We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade we 
started seeing the mirrormaker task showing producer errors and intermittently 
dying. 
 We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem. 
Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so we 
think it's a server side problem.

There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For 
testing, I used a topic *logging* with 20 partitions and 3 replicas (same on 
cluster A and B), the source topic has 50+ million msg.

(this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar)
{noformat}
22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 35122 on 
topic-partition logging-7, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 51572 on 
topic-partition logging-7, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 57785 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 85406 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 128047 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 137049 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 153976 on 
topic-partition logging-5, retrying (2147483646 attempts left). Error: 
REQUEST_TIMED_OUT
 22 Mar 2018 02:27:57.779 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-2, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-10, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-18, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-14, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer 
clientId=producer-1] Got error produce response with correlation id 154077 on 
topic-partition logging-6, retrying (2147483646 attempts left). Error: 
NETWORK_EXCEPTION
 22 Mar 2018 02:29:12.378 [kafka-producer-network-thread | producer-1] WARN 

[jira] [Commented] (KAFKA-6717) TopicPartition Assined twice to a consumer group for 2 consumer instances

2018-03-28 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418435#comment-16418435
 ] 

Ewen Cheslack-Postava commented on KAFKA-6717:
--

Are both assigned the partition in the same generation? As consumers join the 
group, it will rebalance and change the assignment. If you are just looking at 
which consumers are assigned which partitions, it could appear that two of them 
are assigned the same partitions at the same time.

I see you are using the NoOpConsumerRebalanceListener. With this, you wouldn't 
see when partitions were assigned or revoked. What are you doing to verify that 
both consumer instances are assigned the same partitions at the same time?

Without correct handling of partition assignments and revocation, you 
definitely could see data processed twice. In fact, without additional steps 
taken to ensure no duplicates, *at least once* handling is what Kafka consumers 
would normally provide as long as they handle offset commits properly.

> TopicPartition Assined twice to a consumer group for 2 consumer instances 
> --
>
> Key: KAFKA-6717
> URL: https://issues.apache.org/jira/browse/KAFKA-6717
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Yuancheng PENG
>Priority: Major
>
> I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain 
> pattern.
> There are 10 consumers with the same group id.
> I expected that topic-partition to be assigned to only one consumer instance. 
> However some topic partitions are assigned twice in 2 different difference 
> instance, hence the consumer group process duplicate messages.
> {code:java}
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> Collections.singletonList(StickyAssignor.class));
> KafkaConsumer c = new KafkaConsumer<>(props);
> c.subscribe(Pattern.compile(TOPIC_PATTERN), new 
> NoOpConsumerRebalanceListener());
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-28 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418337#comment-16418337
 ] 

Boyang Chen commented on KAFKA-6657:


[~guozhang]thanks Guozhang! I will take a look at this one.

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-28 Thread Boyang Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-6657:
--

Assignee: Boyang Chen

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6725) Indicate "isClosing" in the SinkTaskContext

2018-03-28 Thread Matt Farmer (JIRA)
Matt Farmer created KAFKA-6725:
--

 Summary: Indicate "isClosing" in the SinkTaskContext
 Key: KAFKA-6725
 URL: https://issues.apache.org/jira/browse/KAFKA-6725
 Project: Kafka
  Issue Type: New Feature
Reporter: Matt Farmer


Addition of the isClosing method to SinkTaskContext per this KIP.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75977607



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6376:
---
Description: 
Copy this from KIP-210 discussion thread:

{quote}
Note that currently we have two metrics for `skipped-records` on different
levels:

1) on the highest level, the thread-level, we have a `skipped-records`,
that records all the skipped records due to deserialization errors.
2) on the lower processor-node level, we have a
`skippedDueToDeserializationError`, that records the skipped records on
that specific source node due to deserialization errors.


So you can see that 1) does not cover any other scenarios and can just be
thought of as an aggregate of 2) across all the tasks' source nodes.
However, there are other places that can cause a record to be dropped, for
example:

1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
dropped due to window elapsed.
2) KIP-210: records could be dropped on the producer side.
3) records could be dropped during user-customized processing on errors.
{quote}

[~guozhang] Not sure what you mean by "3) records could be dropped during 
user-customized processing on errors."

Btw: we also drop record with {{null}} key and/or value for certain DSL 
operations. This should be included as well.

KIP: : 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics

  was:
Copy this from KIP-210 discussion thread:

{quote}
Note that currently we have two metrics for `skipped-records` on different
levels:

1) on the highest level, the thread-level, we have a `skipped-records`,
that records all the skipped records due to deserialization errors.
2) on the lower processor-node level, we have a
`skippedDueToDeserializationError`, that records the skipped records on
that specific source node due to deserialization errors.


So you can see that 1) does not cover any other scenarios and can just be
thought of as an aggregate of 2) across all the tasks' source nodes.
However, there are other places that can cause a record to be dropped, for
example:

1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
dropped due to window elapsed.
2) KIP-210: records could be dropped on the producer side.
3) records could be dropped during user-customized processing on errors.
{quote}

[~guozhang] Not sure what you mean by "3) records could be dropped during 
user-customized processing on errors."

Btw: we also drop record with {{null}} key and/or value for certain DSL 
operations. This should be included as well.


> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.
> KIP: : 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6376:
---
Labels: kip  (was: needs-kip)

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.
> KIP: : 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-03-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418207#comment-16418207
 ] 

Matthias J. Sax commented on KAFKA-6437:


I mean that some topics are available but other are not (ie if there are 
multiple input topics). There are case for which Kafka Streams would not fail 
but just process the available topics atm.

I agree that KAFAK-6520 is different; however, it's somehow related (-> state 
"RUNNING" is confusion and not really appropriate). Just wanted to point out 
the relationship. Not sure, if we should introduce DISCONNECTED and IDLE or 
just one state for both. I mentioned it to get a "global picture" only.

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-28 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418195#comment-16418195
 ] 

John Roesler commented on KAFKA-6376:
-

I have created a KIP to solve these issues: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-03-28 Thread Mariam John (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418185#comment-16418185
 ] 

Mariam John commented on KAFKA-6437:


[~mjsax] what do you mean by partially available input topics? KAFKA-6520 deals 
with when the kafka broker is down, then the Kafka streams apps connected to it 
have the state as RUNNING. I think, like you suggested we could have an IDLE 
state in both cases and log a different warning for the different cases. For 
example, in this case, it would be because of missing input topics and in the 
case of KAFKA-6520, it would be because it is unable to connect to the broker.

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6417) plugin.path pointing at a plugin directory causes ClassNotFoundException

2018-03-28 Thread Chris Egerton (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418143#comment-16418143
 ] 

Chris Egerton commented on KAFKA-6417:
--

[~cotedm] Alright, took another look at the Connect code base and have some bad 
news :( It looks like there are enough different places that the 
{{ClassNotFoundException}} can be thrown from at enough different times that 
catching it and displaying a useful error message would be harder than I 
originally thought. On top of that, it would be pretty difficult to 
differentiate between "It looks like this JAR is missing because you specified 
an incorrect plugin path" and "This JAR is actually missing" in most cases, 
since the exception will have occurred after the initial plugin class was 
loaded and outside any code that deals with plugin isolation.

 

As far as forcing all JAR files to go into their own directories goes, I have a 
few more thoughts. If someone tries to start up connect, gets an error about 
JAR files found on the plugin path (possibly also with the information that 
"All JAR files must be placed in a subdirectory of a plugin path directory" or 
something along those lines), and wants to fix the issue, do you think it's 
likely that the next step for them would be to then create a subdirectory and 
then place all of their uber JARs inside that directory? It may seem like a 
reasonable assumption that uber JARs would be loaded independently of each 
other, but if we don't have a meaningful way of differentiating them from 
non-uber JARs in the Connect framework, we'd just end up loading all of the 
plugins in that directory with the same classloader, potentially leading to the 
same overlapping dependency issues that the whole {{plugin.path}} configuration 
is meant to prevent.

 

> plugin.path pointing at a plugin directory causes ClassNotFoundException
> 
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the 
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination 
> of: a) directories immediately containing jars with plugins and their 
> dependencies b) uber-jars with plugins and their dependencies c) directories 
> immediately containing the package directory structure of classes of plugins 
> and their dependencies 
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure 
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc. 
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the 
> resulting behavior is that dependencies for {{myplugin1}} are not properly 
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to 
> debug. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-03-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418131#comment-16418131
 ] 

Matthias J. Sax commented on KAFKA-6437:


[~wojda] Thanks for follow up. With regard to the "RUNNING" status, it seems to 
be related to KAFKA-6520.

About this ticket and KAFKA-6720: I agree that it is opinion based if failing 
of logging is the right approach. Note, that Kafka Streams inherits its 
behavior from KafkaConsumer: it also idles if the input topics don't exist. 
Therefore, it might even be required to change the consumer, affecting even 
more developers. Maybe [~guozhang] can shed some light, why the consumer is 
designed in this way (I am sure, there are good reasons for it).

About adding a new config: might be a solution. However, from our experience we 
learned that having too many configs can be confusion for users -- thus, we 
tend to be conservative about adding new configs if there is better solution. I 
am not saying, we should not introduce a "fail-on-missing-topic" configs, I am 
just saying we should discuss it in detail before we make a decision. Do you 
think that fixing KAFKA-6520 would be an acceptable alternative solution 
instead of throwing an exception? Meaning, introducing an new "IDLE" state if 
the input topics are missing? Question would be, how to handle partially 
available input topics? (Just putting out ideas here...)

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418117#comment-16418117
 ] 

Guozhang Wang commented on KAFKA-6657:
--

Thanks [~sssanthalingam], I'm unassigning since the reporter of 
https://issues.apache.org/jira/browse/KAFKA-6723 would like to work on it.

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-6657:


Assignee: (was: siva santhalingam)

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6723.
--
Resolution: Duplicate

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418114#comment-16418114
 ] 

Guozhang Wang commented on KAFKA-6723:
--

Boyang, I'm going to mark this ticket as subsumed by KAFKA-6657, and please 
feel free to take KAFKA-6657 as Siva does not have time to work on it lately.

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-03-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418111#comment-16418111
 ] 

Matthias J. Sax commented on KAFKA-5253:


Thanks for working on this. If the fix does not require a public API changes, 
we don't need a KIP -- when I created the KIP, I was assuming that we will need 
a public API for the fix. Please go ahead an open the PR and we can take it 
from there.

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-03-28 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418106#comment-16418106
 ] 

Matthias J. Sax commented on KAFKA-5882:


Changing the structure of the topology is not a compatible change. Are you 
doing rolling-bounce upgrades? For this case, a failure is expected. You would 
need to put the whole application offline (ie, shut down all old instances) and 
then redeploy the new applications. For this offline upgrade, it should work. 
Or do you see the NPE if you do an offline upgrade?

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-03-28 Thread Vikas Tikoo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418101#comment-16418101
 ] 

Vikas Tikoo commented on KAFKA-5882:


Guozhang Wang: Here's rest of the log trace - 
{code}
 StreamsException
 stream-thread 
[pipeline-helper-e7f04112-3f80-49a2-8585-f5ecf63b009a-StreamThread-1] Failed to 
rebalance.
 org.apache.kafka.streams.processor.internals.StreamThread in pollRequests at 
line 589
 org.apache.kafka.streams.processor.internals.StreamThread}} in runLoop at line 
553
 org.apache.kafka.streams.processor.internals.StreamThread}} in run at line 527
{code}

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6598) Kafka to support using ETCD beside Zookeeper

2018-03-28 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418097#comment-16418097
 ] 

Ismael Juma commented on KAFKA-6598:


Here's a link: 
https://lists.apache.org/thread.html/2bc187040051008452b40b313db06b476c248ef7a5ed7529afe7b118@1448997154@%3Cdev.kafka.apache.org%3E

> Kafka to support using ETCD beside Zookeeper
> 
>
> Key: KAFKA-6598
> URL: https://issues.apache.org/jira/browse/KAFKA-6598
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Sebastian Toader
>Priority: Major
>
> The current Kafka implementation is bound to {{Zookeeper}} to store its 
> metadata for forming a cluster of nodes (producer/consumer/broker). 
> As Kafka is becoming popular for streaming in various environments where 
> {{Zookeeper}} is either not easy to deploy/manage or there are better 
> alternatives to it there is a need 
> to run Kafka with other metastore implementation than {{Zookeeper}}.
> {{etcd}} can provide the same semantics as {{Zookeeper}} for Kafka and since 
> {{etcd}} is the favorable choice in certain environments (e.g. Kubernetes) 
> Kafka should be able to run with {{etcd}}.
> From the user's point of view should be straightforward to configure to use 
> {{etcd}} by just simply specifying a connection string that point to {{etcd}} 
> cluster.
> To avoid introducing instability the original interfaces should be kept and 
> only the low level {{Zookeeper}} API calls should be replaced with \{{etcd}} 
> API calls in case Kafka is configured 
> to use {{etcd}}.
> On the long run (which is out of scope of this jira) there should be an 
> abstract layer in Kafka which then various metastore implementations would 
> implement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6720) Inconsistent Kafka Streams behaviour when topic does not exist

2018-03-28 Thread Daniel Wojda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418090#comment-16418090
 ] 

Daniel Wojda commented on KAFKA-6720:
-

My mistake [~johnma]. That makes sense. Thank you for explanation!

> Inconsistent Kafka Streams behaviour when topic does not exist
> --
>
> Key: KAFKA-6720
> URL: https://issues.apache.org/jira/browse/KAFKA-6720
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Daniel Wojda
>Priority: Minor
>
> When Kafka Streams starts it reads metadata about topics used in topology
>  and it's partitions. If topology of that stream contains stateful operation 
> like #join, and a topic does not exist 
> [TopologyBuilderException|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L719]
>  will be thrown.
> In case of streams with simple topology with stateless operations only, like 
> #mapValue, and topic does not exist, Kafka Streams does not throw any 
> exception, just logs a warning:
>  ["log.warn("No partitions found for topic {}", 
> topic);"|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L435]
>  
> I believe the behaviour of Kafka Streams in both cases should be the same, 
> and it should throw TopologyBuilderException.
> I am more than happy to prepare a Pull Request if it is a valid issue.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6720) Inconsistent Kafka Streams behaviour when topic does not exist

2018-03-28 Thread Mariam John (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418089#comment-16418089
 ] 

Mariam John commented on KAFKA-6720:


[~wojda] if you look at the resolved tag, it is marked as duplicate. The only 
way to mark it as duplicate is to resolve it as duplicate. If you look at 
KAFKA-6437, you will see some comments that [~guozhang] added yesterday. We 
will use that defect to resolve this defect as well as KAFKA-6437. Hope that 
makes sense. I will add a comment as soon as I upload a fix. Thank you.

> Inconsistent Kafka Streams behaviour when topic does not exist
> --
>
> Key: KAFKA-6720
> URL: https://issues.apache.org/jira/browse/KAFKA-6720
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Daniel Wojda
>Priority: Minor
>
> When Kafka Streams starts it reads metadata about topics used in topology
>  and it's partitions. If topology of that stream contains stateful operation 
> like #join, and a topic does not exist 
> [TopologyBuilderException|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L719]
>  will be thrown.
> In case of streams with simple topology with stateless operations only, like 
> #mapValue, and topic does not exist, Kafka Streams does not throw any 
> exception, just logs a warning:
>  ["log.warn("No partitions found for topic {}", 
> topic);"|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L435]
>  
> I believe the behaviour of Kafka Streams in both cases should be the same, 
> and it should throw TopologyBuilderException.
> I am more than happy to prepare a Pull Request if it is a valid issue.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6437) Streams does not warn about missing input topics, but hangs

2018-03-28 Thread Daniel Wojda (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418073#comment-16418073
 ] 

Daniel Wojda commented on KAFKA-6437:
-

I would like to add my comment as a user of Kafka Streams and author of 
KAFKA-6720.
Important missing information here is that if you start Kafka Streams 
application without input topics created, it'll log a warning and stays in this 
"idle" state until you create that topic(s) *AND* a rebalancing happens. If you 
check the status of stream it will be "RUNNING". What is more, please correct 
me if I'm wrong, checking consumer lag will not help, because lag will be 0 
(number of messages in non-existing topic is 0). 

As [~mjsax] already mentioned "it's well documented that you need to create all 
input topics before you start your application", so in my opinion "stopping the 
world and failing" is a better option than starting a "zombie" application. 
I understand that Kafka Streams has many users, other developers can have a 
different opinion than me, but in that case I'd suggest introducing a new 
config. "fail-on-missing-topic"? WDYT?

> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
>  Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it 
> hangs "in the middle" of the topology (at …9, see below). Only parts of 
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes 
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
>   ProcessorTopology:
>   KSTREAM-SOURCE-11:
>   topics: 
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
>   children:   [KTABLE-AGGREGATE-12]
>   KTABLE-AGGREGATE-12:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KTABLE-TOSTREAM-20]
>   KTABLE-TOSTREAM-20:
>   children:   [KSTREAM-SINK-21]
>   KSTREAM-SINK-21:
>   topic:  data_udr_month_customer_aggregration
>   KSTREAM-SOURCE-17:
>   topics: 
> [mystreams_app-KSTREAM-MAP-14-repartition]
>   children:   [KSTREAM-LEFTJOIN-18]
>   KSTREAM-LEFTJOIN-18:
>   states: 
> [KTABLE-AGGREGATE-STATE-STORE-09]
>   children:   [KSTREAM-SINK-19]
>   KSTREAM-SINK-19:
>   topic:  data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0, 
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the 
> missing input topic. This preprocessing won't happen without the topic, 
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing 
> topic and it's consequences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-28 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418023#comment-16418023
 ] 

John Roesler edited comment on KAFKA-6376 at 3/28/18 8:00 PM:
--

Just doing a little initial archaeology...

 

The thread-level metric is updated here:

org.apache.kafka.streams.processor.internals.StreamThread#addRecordsToTasks:
{quote}streamsMetrics.skippedRecordsSensor.record(records.count() - 
numAddedRecords, timerStartedMs);{quote}
 

records.count() - numAddedRecords counts this:

org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative) 
timestamp, the record is skipped and not added to the queue for processing
{quote}
 

Ah, but it drills down into 
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we 
have
{quote}final ConsumerRecord record = 
recordDeserializer.deserialize(processorContext, rawRecord);
 if (record == null)
 Unknown macro: \{ continue; }
{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
 if (timestamp < 0)
 Unknown macro: \{ continue; }
{quote}
All other code paths in there either add the record or throw.

 

The former of these cases is accounted in 
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
 catch (final Exception deserializationException) {

...
 else
Unknown macro: \{ 
sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record(); 
return null; }{quote}
all other paths either return a record or throw

 

 

 


was (Author: vvcephei):
Just doing a little initial archaeology...

 

Thread-level metric looks like it only counts this:

org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative) 
timestamp, the record is skipped and not added to the queue for processing
{quote}
 

Ah, but it drills down into 
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we 
have
{quote}final ConsumerRecord record = 
recordDeserializer.deserialize(processorContext, rawRecord);
 if (record == null)
Unknown macro: \{ continue; }{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
 if (timestamp < 0)
Unknown macro: \{ continue; }{quote}
All other code paths in there either add the record or throw.

 

The former of these cases is accounted in 
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
 catch (final Exception deserializationException) {

...
else {
 sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
 return null;
}{quote}
all other paths either return a record or throw

 

 

 

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-28 Thread siva santhalingam (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418027#comment-16418027
 ] 

siva santhalingam commented on KAFKA-6657:
--

[~guozhang] I have not started on this yet and I don't think I can get to it 
for a couple of weeks. So please reassign if needed.

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: siva santhalingam
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-28 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418023#comment-16418023
 ] 

John Roesler edited comment on KAFKA-6376 at 3/28/18 7:55 PM:
--

Just doing a little initial archaeology...

 

Thread-level metric looks like it only counts this:

org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative) 
timestamp, the record is skipped and not added to the queue for processing
{quote}
 

Ah, but it drills down into 
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we 
have
{quote}final ConsumerRecord record = 
recordDeserializer.deserialize(processorContext, rawRecord);
 if (record == null)
Unknown macro: \{ continue; }{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
 if (timestamp < 0)
Unknown macro: \{ continue; }{quote}
All other code paths in there either add the record or throw.

 

The former of these cases is accounted in 
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
 catch (final Exception deserializationException) {

...
else {
 sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
 return null;
}{quote}
all other paths either return a record or throw

 

 

 


was (Author: vvcephei):
Thread-level metric looks like it only counts this:

org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative) 
timestamp, the record is skipped and not added to the queue for processing
{quote}
 

Ah, but it drills down into 
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we 
have
{quote}final ConsumerRecord record = 
recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null) {
 continue;
}{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
 continue;
}{quote}
All other code paths in there either add the record or throw.

 

The former of these cases is accounted in 
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
catch (final Exception deserializationException) {{quote}
 

 

 

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-28 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418023#comment-16418023
 ] 

John Roesler commented on KAFKA-6376:
-

Thread-level metric looks like it only counts this:

org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative) 
timestamp, the record is skipped and not added to the queue for processing
{quote}
 

Ah, but it drills down into 
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we 
have
{quote}final ConsumerRecord record = 
recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null) {
 continue;
}{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
 continue;
}{quote}
All other code paths in there either add the record or throw.

 

The former of these cases is accounted in 
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
catch (final Exception deserializationException) {{quote}
 

 

 

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6684) Support casting values with bytes schema to string

2018-03-28 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated KAFKA-6684:
-
Description: 
Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

Examples:

{{BigDecimal}} will cast to the string representation of the number.

{{Timestamp}} will cast to the string representation of the timestamp, or maybe 
UTC {{mmddTHH:MM:SS.f}} format?

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.

  was:
Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.


> Support casting values with bytes schema to string 
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is 
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's 
> {{toString()}}, such that if the object is actually a LogicalType it can be 
> "serialized" as string instead of bytes+schema.
> Examples:
> {{BigDecimal}} will cast to the string representation of the number.
> {{Timestamp}} will cast to the string representation of the timestamp, or 
> maybe UTC {{mmddTHH:MM:SS.f}} format?
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its 
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6684) Support casting values with bytes schema to string

2018-03-28 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated KAFKA-6684:
-
Description: 
Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

 
{noformat}
Examples:
BigDecimal will cast to the string representation of the number.
Timestamp will cast to the string representation of the timestamp, or maybe UTC 
mmddTHH:MM:SS.f format?
{noformat}
 

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.

  was:
Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

Examples:

{{BigDecimal}} will cast to the string representation of the number.

{{Timestamp}} will cast to the string representation of the timestamp, or maybe 
UTC {{mmddTHH:MM:SS.f}} format?

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.


> Support casting values with bytes schema to string 
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is 
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's 
> {{toString()}}, such that if the object is actually a LogicalType it can be 
> "serialized" as string instead of bytes+schema.
>  
> {noformat}
> Examples:
> BigDecimal will cast to the string representation of the number.
> Timestamp will cast to the string representation of the timestamp, or maybe 
> UTC mmddTHH:MM:SS.f format?
> {noformat}
>  
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its 
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6684) Support casting values with bytes schema to string

2018-03-28 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated KAFKA-6684:
-
Description: 
Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

Examples:

{{BigDecimal}} will cast to the string representation of the number.

{{Timestamp}} will cast to the string representation of the timestamp, or maybe 
UTC {{mmddTHH:MM:SS.f}} format?

 

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.

  was:
Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

Examples:

{{BigDecimal}} will cast to the string representation of the number.

{{Timestamp}} will cast to the string representation of the timestamp, or maybe 
UTC {{mmddTHH:MM:SS.f}} format?

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.


> Support casting values with bytes schema to string 
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is 
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's 
> {{toString()}}, such that if the object is actually a LogicalType it can be 
> "serialized" as string instead of bytes+schema.
> Examples:
> {{BigDecimal}} will cast to the string representation of the number.
> {{Timestamp}} will cast to the string representation of the timestamp, or 
> maybe UTC {{mmddTHH:MM:SS.f}} format?
>  
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its 
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6684) Support casting values with bytes schema to string

2018-03-28 Thread Amit Sela (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Sela updated KAFKA-6684:
-
Description: 
Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

Examples:

{{BigDecimal}} will cast to the string representation of the number.

{{Timestamp}} will cast to the string representation of the timestamp, or maybe 
UTC {{mmddTHH:MM:SS.f}} format?

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.

  was:
Casting from BYTES is not supported, which means that casting LogicalTypes is 
not supported.

This proposes to allow casting anything to a string, kind of like Java's 
{{toString()}}, such that if the object is actually a LogicalType it can be 
"serialized" as string instead of bytes+schema.

Examples:

{{BigDecimal}} will cast to the string representation of the number.

{{Timestamp}} will cast to the string representation of the timestamp, or maybe 
UTC {{mmddTHH:MM:SS.f}} format?

 

Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up 
to the user to know the data.

This would help when using a JSON sink, or anything that's not Avro.


> Support casting values with bytes schema to string 
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is 
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's 
> {{toString()}}, such that if the object is actually a LogicalType it can be 
> "serialized" as string instead of bytes+schema.
> Examples:
> {{BigDecimal}} will cast to the string representation of the number.
> {{Timestamp}} will cast to the string representation of the timestamp, or 
> maybe UTC {{mmddTHH:MM:SS.f}} format?
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its 
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL

2018-03-28 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417959#comment-16417959
 ] 

Cemalettin Koç commented on KAFKA-6713:
---

Hi [~guozhang], 

For 1 and 2)

I would like to access my customized version of `CategoryInMemoryStore`. I will 
be glad if you write something simple for me. I could not follow at your 
comment.

 

3) My category topic is updated rarely. I need all data and I am using 
GlobalKTable and I am using some rendering at my pages. Since it is changing 
rarely, I would like to cache it but in case an update I would like to 
invalidate this cache. However currently I could not find a way to be notified 
in case a my GlobalKTable based InMemoryStore is updated. If a new Category 
added or changed in my InMemoryStore, I would like to trigger invalidation of 
my rendering cache.

> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
>  Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java, 
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my 
> stores, I would like to store my `Category` entites and I would like to query 
> them by their name as well. My custom store has some capabilities beyond 
> `get` such as get by `name`. I also want to get all entries in a hierarchical 
> way in a lazy fashion. I have other use cases as well.
>  
> In order to accomplish my task I had to implement  a custom 
> `KeyValueBytesStoreSupplier`,  `BytesTypeConverter` and 
>  
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
>   private BytesTypeConverter converter;
>   private KeyValueStore delegated;
>   public DelegatingByteStore(KeyValueStore delegated, 
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
>   }
>   @Override
>   public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
>   converter.outerValue(value));
>   }
>   @Override
>   public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
>   }
>   ..
> {code}
>  
>  Type Converter:
> {code:java}
> public interface TypeConverter {
>   IK innerKey(final K key);
>   IV innerValue(final V value);
>   List> innerEntries(final List> from);
>   List> outerEntries(final List> from);
>   V outerValue(final IV value);
>   KeyValue outerKeyValue(final KeyValue from);
>   KeyValueinnerKeyValue(final KeyValue entry);
>   K outerKey(final IK ik);
> }
> {code}
>  
> This is unfortunately too cumbersome and hard to maintain.  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-03-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417946#comment-16417946
 ] 

Guozhang Wang commented on KAFKA-5882:
--

Filed https://github.com/apache/kafka/pull/4790 to better improve the logging 
upon those errors.

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-03-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417926#comment-16417926
 ] 

Guozhang Wang commented on KAFKA-5882:
--

Thanks Vikas, Ionut-Maxim for reporting this issue, and I think there is indeed 
some bugs lurking around here. To help us further investigate the issue, could 
you provide more logs on the observed scenario? For example, after the 

{code}
Error caught during partition assignment ..
{code}

There should be a {{Failed to rebalance.}} error message which expose the 
exception type and the stack trace. That can help us better allocate which part 
is throwing the exception (since the exception.getMessage() returns null I 
cannot tell if it is a task migration issue or others).

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-28 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417922#comment-16417922
 ] 

John Roesler commented on KAFKA-6474:
-

Hey [~h314to],

I hope you're doing well.

FYI, we have just merged [https://github.com/apache/kafka/pull/4760] 
incorporating your pattern for hooking test-utils into the Streams test 
dependencies. You may have conflicts to resolve when you rebase on trunk.

Thanks,

-John

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-03-28 Thread Jagadesh Adireddi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417896#comment-16417896
 ] 

Jagadesh Adireddi commented on KAFKA-5253:
--

Hi [~mjsax],


I am new to Kafka contribution. I made code changes for above issue and it's 
working as expected.


I have few questions before I submit pull request. I made changes inside 
*private* method 

*KStreamTestDriver*#*sourceNodeByTopicName .* I haven't modified any method 
signature or so. Just embedded below code 
 
{code:java}
 Set sourceTopics = topology.sourceTopics();
for (final String eachSourceTopic : sourceTopics) {
if (Pattern.compile(eachSourceTopic).matcher(topicName).matches()) {
return topology.source(eachSourceTopic);
}
}
{code}
Do i still need to submit KIP for this change, as i am not touching any public 
methods.

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-28 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6376:
---
Affects Version/s: (was: 1.2.0)
   1.0.0

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6657) Add StreamsConfig prefix for different consumers

2018-03-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417858#comment-16417858
 ] 

Guozhang Wang commented on KAFKA-6657:
--

Hello Siva, are you still working on this ticket?

> Add StreamsConfig prefix for different consumers
> 
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: siva santhalingam
>Priority: Major
>  Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by 
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main 
> consumer (2) the restore consumer and (3) the global consumer (that is a 
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different 
> consumers. Thus, we should add two new prefix for restore and global 
> consumer. We might also consider to extend `KafkaClientSupplier` and add a 
> `getGlobalConsumer()` method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417856#comment-16417856
 ] 

Guozhang Wang commented on KAFKA-6723:
--

The proposal is to have a prefix for restore consumer and global consumer 
separate, not for a specific config only. I.e. for any consumer configs, users 
can prefix it to differentiate between different consumers, like:

consumer.poll.ms
restore-consumer.poll.ms
global-consumer.poll.ms

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6376) Improve Streams metrics for skipped records

2018-03-28 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-6376:

Affects Version/s: (was: 1.0.0)
   1.2.0

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.2.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6724) ConsumerPerformance resets offsets on every startup

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417807#comment-16417807
 ] 

ASF GitHub Bot commented on KAFKA-6724:
---

hachikuji closed pull request #4787: KAFKA-6724 ConsumerPerformance resets 
offsets on every startup
URL: https://github.com/apache/kafka/pull/4787
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index a3e60e652c4..7e0dbcbe064 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -157,8 +157,6 @@ object ConsumerPerformance extends LazyLogging {
   def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
 joinStart = System.currentTimeMillis
   }})
-consumer.poll(0)
-consumer.seekToBeginning(Collections.emptyList())
 
 // Now start the benchmark
 val startMs = System.currentTimeMillis


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
> Fix For: 1.2.0
>
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for 
> it's group on every startup. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6724) ConsumerPerformance resets offsets on every startup

2018-03-28 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6724.

   Resolution: Fixed
Fix Version/s: 1.2.0

> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
> Fix For: 1.2.0
>
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for 
> it's group on every startup. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6642) Rack aware task assignment in kafka streams

2018-03-28 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417805#comment-16417805
 ] 

Guozhang Wang commented on KAFKA-6642:
--

[~asurana] Thanks for creating this ticket, I was not aware that it has been 
created some time ago. There is another ticket created recently as 
https://issues.apache.org/jira/browse/KAFKA-6718. Could you take a look into 
the proposed PR https://github.com/apache/kafka/pull/4785  and see if that 
aligns with your proposals, and if you could collaborate with [~_deepakgoyal] 
on that PR?

> Rack aware task assignment in kafka streams
> ---
>
> Key: KAFKA-6642
> URL: https://issues.apache.org/jira/browse/KAFKA-6642
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware 
> replica 
> assignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]).
> This request is to have a similar feature for kafka streams applications. 
> Standby tasks/standby replica assignment in kafka streams is currently not 
> rack aware, and this request is to make it rack aware for better availability.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6538) Enhance ByteStore exceptions with more context information

2018-03-28 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6538:
-
Description: 
In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
only have concrete key/value types on outer layers/wrappers of the stores.

For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
messages anymore if a put/get/delete operation fails as it only handles plain 
bytes.

In addition, the corresponding calls to record changelog records to record 
collectors will also be sending byte arrays only, and hence when there is an 
error happening, the record collector can only display the key but not the 
value since it is all bytes:

{code}
[ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
task [2_2] Error sending record (key {"eventId":XXX,"version":123}
value [] timestamp YYY) to topic TTT
due to ...
{code} 

Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
corresponding information for which key/value the operation failed in the 
wrapping stores (KeyValueStore, WindowedStored, and SessionStore).

Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
exceptions.

  was:
In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
only have concrete key/value types on outer layers/wrappers of the stores.

For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
messages anymore if a put/get/delete operation fails as it only handles plain 
bytes.

Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
corresponding information for which key/value the operation failed in the 
wrapping stores (KeyValueStore, WindowedStored, and SessionStore).

Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
exceptions.


> Enhance ByteStore exceptions with more context information
> --
>
> Key: KAFKA-6538
> URL: https://issues.apache.org/jira/browse/KAFKA-6538
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: newbie
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and 
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error 
> messages anymore if a put/get/delete operation fails as it only handles plain 
> bytes.
> In addition, the corresponding calls to record changelog records to record 
> collectors will also be sending byte arrays only, and hence when there is an 
> error happening, the record collector can only display the key but not the 
> value since it is all bytes:
> {code}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl   -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code} 
> Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with 
> corresponding information for which key/value the operation failed in the 
> wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
> Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}} 
> exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5253) TopologyTestDriver must handle streams created with patterns

2018-03-28 Thread Jagadesh Adireddi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadesh Adireddi reassigned KAFKA-5253:


Assignee: Jagadesh Adireddi

> TopologyTestDriver must handle streams created with patterns
> 
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
>  Labels: beginner, needs-kip, newbie
>
> *Context*
>  -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to 
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
>  The unit test of the topology fails because -KStreamTestDriver- 
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
>  Underneath is a unit test explaining what I understand should happen, but is 
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an 
> exception as the topology builder explicitly checks overlapping topic names 
> and patterns, in any order of adding pattern and topic. So, it is intended 
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source = 
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new 
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), 
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
>  If anybody can help in defining the solution, I can create a pull request 
> for this change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6598) Kafka to support using ETCD beside Zookeeper

2018-03-28 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417741#comment-16417741
 ] 

Ismael Juma commented on KAFKA-6598:


This was discussed before when "KIP-30: Allow for brokers to have plug-able 
consensus and meta data storage sub systems" was proposed. It's worth reading 
that discussion for the concerns with a pluggable approach.

> Kafka to support using ETCD beside Zookeeper
> 
>
> Key: KAFKA-6598
> URL: https://issues.apache.org/jira/browse/KAFKA-6598
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, core
>Reporter: Sebastian Toader
>Priority: Major
>
> The current Kafka implementation is bound to {{Zookeeper}} to store its 
> metadata for forming a cluster of nodes (producer/consumer/broker). 
> As Kafka is becoming popular for streaming in various environments where 
> {{Zookeeper}} is either not easy to deploy/manage or there are better 
> alternatives to it there is a need 
> to run Kafka with other metastore implementation than {{Zookeeper}}.
> {{etcd}} can provide the same semantics as {{Zookeeper}} for Kafka and since 
> {{etcd}} is the favorable choice in certain environments (e.g. Kubernetes) 
> Kafka should be able to run with {{etcd}}.
> From the user's point of view should be straightforward to configure to use 
> {{etcd}} by just simply specifying a connection string that point to {{etcd}} 
> cluster.
> To avoid introducing instability the original interfaces should be kept and 
> only the low level {{Zookeeper}} API calls should be replaced with \{{etcd}} 
> API calls in case Kafka is configured 
> to use {{etcd}}.
> On the long run (which is out of scope of this jira) there should be an 
> abstract layer in Kafka which then various metastore implementations would 
> implement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6723) Separate "max.poll.record" for restore consumer and common consumer

2018-03-28 Thread Boyang Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417697#comment-16417697
 ] 

Boyang Chen commented on KAFKA-6723:


[~mjsax]  [~guozhang] I haven't seen a concrete design for KAFKA-6657. So am I 
expecting sth like:

default.max.poll.records

restore.max.poll.records

global.max.poll.records

If that one could cover cases by creating new consumer config parameters, I 
guess this Jira sounds duplicate. Btw, I think the default value should be the 
same.

 

> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore 
> consumer and normal stream consumer. In reality, they are doing different 
> processing workloads, and in order to speed up the restore speed, restore 
> consumer is supposed to have a higher throughput by setting `max.poll.record` 
> higher. The change involved is trivial: 
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name), 
> so we need a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6724) ConsumerPerformance resets offsets on every startup

2018-03-28 Thread Alex Dunayevsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Dunayevsky updated KAFKA-6724:
---
Reviewer: Alex Dunayevsky

> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for 
> it's group on every startup. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6724) ConsumerPerformance resets offsets on every startup

2018-03-28 Thread Alex Dunayevsky (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Dunayevsky updated KAFKA-6724:
---
Reviewer:   (was: Alex Dunayevsky)

> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for 
> it's group on every startup. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6716) discardChannel should be released in MockSelector#completeSend

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417372#comment-16417372
 ] 

ASF GitHub Bot commented on KAFKA-6716:
---

rajinisivaram closed pull request #4783: KAFKA-6716: Should close the 
`discardChannel` in completeSend
URL: https://github.com/apache/kafka/pull/4783
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java 
b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index bd27d5c6d99..200d5111517 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -124,11 +124,12 @@ private void completeInitiatedSends() throws IOException {
 
 private void completeSend(Send send) throws IOException {
 // Consume the send so that we will be able to send more requests to 
the destination
-ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
-while (!send.completed()) {
-send.writeTo(discardChannel);
+try (ByteBufferChannel discardChannel = new 
ByteBufferChannel(send.size())) {
+while (!send.completed()) {
+send.writeTo(discardChannel);
+}
+completedSends.add(send);
 }
-completedSends.add(send);
 }
 
 private void completeDelayedReceives() {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> discardChannel should be released in MockSelector#completeSend
> --
>
> Key: KAFKA-6716
> URL: https://issues.apache.org/jira/browse/KAFKA-6716
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: huxihx
>Priority: Minor
>
> {code}
> private void completeSend(Send send) throws IOException {
> // Consume the send so that we will be able to send more requests to 
> the destination
> ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
> while (!send.completed()) {
> send.writeTo(discardChannel);
> }
> completedSends.add(send);
> }
> {code}
> The {{discardChannel}} should be closed before returning from the method



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6724) ConsumerPerformance resets offsets on every startup

2018-03-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417369#comment-16417369
 ] 

ASF GitHub Bot commented on KAFKA-6724:
---

rootex- opened a new pull request #4787: KAFKA-6724 ConsumerPerformance resets 
offsets on every startup
URL: https://github.com/apache/kafka/pull/4787
 
 
   Remove consumer offset reset on startup
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ x ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
>  Issue Type: Bug
>  Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for 
> it's group on every startup. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6724) ConsumerPerformance resets offsets on every startup

2018-03-28 Thread Alex Dunayevsky (JIRA)
Alex Dunayevsky created KAFKA-6724:
--

 Summary: ConsumerPerformance resets offsets on every startup
 Key: KAFKA-6724
 URL: https://issues.apache.org/jira/browse/KAFKA-6724
 Project: Kafka
  Issue Type: Bug
  Components: core, tools
Affects Versions: 0.11.0.1
Reporter: Alex Dunayevsky


ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for it's 
group on every startup. 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6684) Support casting values with bytes schema to string

2018-03-28 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406470#comment-16406470
 ] 

Amit Sela edited comment on KAFKA-6684 at 3/28/18 1:32 PM:
---

[~hachikuji] or [~ewencp] mind taking a look?


was (Author: amitsela):
[~hachikuji] mind taking a look?

> Support casting values with bytes schema to string 
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is 
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's 
> {{toString()}}, such that if the object is actually a LogicalType it can be 
> "serialized" as string instead of bytes+schema.
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its 
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-28 Thread Deepak Goyal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417173#comment-16417173
 ] 

Deepak Goyal commented on KAFKA-6718:
-

Meanwhile, please look at the PR: [https://github.com/apache/kafka/pull/4785] 

> Rack Aware Replica Task Assignment for Kafka Streams
> 
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Assignee: Deepak Goyal
>Priority: Major
>  Labels: needs-kip
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks <= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks > number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-03-28 Thread Ionut-Maxim Margelatu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417158#comment-16417158
 ] 

Ionut-Maxim Margelatu commented on KAFKA-5882:
--

We have been able to consistently reproduce this issue. In our case, the reason 
is the change of the topology between deployments:
 * the existing deployment uses 2 subtopologies (A->B,B->C)
 * the new deployment uses 3 subtopologies (A->B,A'->B,B->C)

Performing a deployment with an updated topology results in a failure. 
Performing a deployment with the same topology works perfectly.

Here are some logs we added to StreamsThread in order to figure out what was 
going on:
{noformat}
2018-03-28T09:51:32 srv="mepw" [MEP-StreamThread-2] WARN  
org.apache.kafka.streams.processor.internals.StreamTask 
partition=mepw_internal-47 partitionTopic=mepw_internal 
topology=ProcessorTopology:
   hubSource:
  topics:[mepw_change_event]
  children:  [CDCProcessor]
   CDCProcessor:
  children:  [CDCSink]
   CDCSink:
  topic:mepw_internal

2018-03-28T09:51:32 srv="mepw" [MEP-StreamThread-2] ERROR 
org.apache.kafka.streams.processor.internals.StreamThread stream-thread 
[MEP-StreamThread-2] Error caught during partition assignment, will abort the 
current process and re-throw at the end of rebalance: null
{noformat}
During the rebalancing the Kafka Streams threads in the new app instances are 
assigned to a sub-topology that doesn't necessarily have a source for the 
partition they were assigned. This is what we could figure out. I hope this 
helps other people.

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  

[jira] [Updated] (KAFKA-6718) Rack Aware Replica Task Assignment for Kafka Streams

2018-03-28 Thread Ashish Surana (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashish Surana updated KAFKA-6718:
-
Description: 
|Machines in data centre are sometimes grouped in racks. Racks provide 
isolation as each rack may be in a different physical location and has its own 
power source. When tasks are properly replicated across racks, it provides 
fault tolerance in that if a rack goes down, the remaining racks can continue 
to serve traffic.
  
 This feature is already implemented at Kafka 
[KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
 but we needed similar for task assignments at Kafka Streams Application layer. 
  
 This features enables replica tasks to be assigned on different racks for 
fault-tolerance.
 NUM_STANDBY_REPLICAS = x
 totalTasks = x+1 (replica + active)
 # If there are no rackID provided: Cluster will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks <= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks > number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.|

We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
helps StickyPartitionAssignor to assign tasks in such a way that no two replica 
tasks are on same rack if possible.
 Post that it also helps to maintain stickyness with-in the rack.|

  was:
|Machines in data centre are sometimes grouped in racks. Racks provide 
isolation as each rack may be in a different physical location and has its own 
power source. When tasks are properly replicated across racks, it provides 
fault tolerance in that if a rack goes down, the remaining racks can continue 
to serve traffic.
  
 This feature is already implemented at Kafka 
[KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
 but we needed similar for task assignments at Kafka Streams Application layer. 
  
 This features enables replica tasks to be assigned on different racks for 
fault-tolerance.
 NUM_STANDBY_REPLICAS = x
 totalTasks = x+1 (replica + active)
 # If there are no rackID provided: Cluster will behave rack-unaware
 # If same rackId is given to all the nodes: Cluster will behave rack-unaware
 # If (totalTasks >= number of racks), then Cluster will be rack aware i.e. 
each replica task is each assigned to a different rack.
 # Id (totalTasks < number of racks), then it will first assign tasks on 
different racks, further tasks will be assigned to least loaded node, cluster 
wide.|

We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
helps StickyPartitionAssignor to assign tasks in such a way that no two replica 
tasks are on same rack if possible.
 Post that it also helps to maintain stickyness with-in the rack.|


> Rack Aware Replica Task Assignment for Kafka Streams
> 
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Assignee: Deepak Goyal
>Priority: Major
>  Labels: needs-kip
>
> |Machines in data centre are sometimes grouped in racks. Racks provide 
> isolation as each rack may be in a different physical location and has its 
> own power source. When tasks are properly replicated across racks, it 
> provides fault tolerance in that if a rack goes down, the remaining racks can 
> continue to serve traffic.
>   
>  This feature is already implemented at Kafka 
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
>  but we needed similar for task assignments at Kafka Streams Application 
> layer. 
>   
>  This features enables replica tasks to be assigned on different racks for 
> fault-tolerance.
>  NUM_STANDBY_REPLICAS = x
>  totalTasks = x+1 (replica + active)
>  # If there are no rackID provided: Cluster will behave rack-unaware
>  # If same rackId is given to all the nodes: Cluster will behave rack-unaware
>  # If (totalTasks <= number of racks), then Cluster will be rack aware i.e. 
> each replica task is each assigned to a different rack.
>  # Id (totalTasks > number of racks), then it will first assign tasks on 
> different racks, further tasks will be assigned to least loaded node, cluster 
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which 
> helps StickyPartitionAssignor to assign tasks in such a way that no two 
> replica tasks are on same rack if possible.
>  Post that it also helps to maintain stickyness with-in the rack.|



--

[jira] [Commented] (KAFKA-5882) NullPointerException in StreamTask

2018-03-28 Thread Ionut-Maxim Margelatu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417038#comment-16417038
 ] 

Ionut-Maxim Margelatu commented on KAFKA-5882:
--

We are encountering the same issue with 0.11.0.2.

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073] 
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener 
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
>  ~[myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
> [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  [myapp-streamer.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
>  [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO  StreamThread:1040 - stream-thread 
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO  KafkaProducer:972 - Closing the Kafka producer with 
> timeoutMillis = 9223372036854775807 ms.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6649) ReplicaFetcher stopped after non fatal exception is thrown

2018-03-28 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416912#comment-16416912
 ] 

Srinivas Dhruvakumar commented on KAFKA-6649:
-

[~hachikuji] Still hitting the bug after testing with the patch. 

> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic 
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
>  at scala.Option.foreach(Option.scala:257)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
>  at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 2098535 of partition 
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0, 
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas 
> start to lag behind, presumably because we are not fetching from the leader 
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
>  info("Starting")
>  try {
>while (isRunning)
>  doWork()
>  } catch {
>case e: FatalExitError =>
>  shutdownInitiated.countDown()
>  shutdownComplete.countDown()
>  info("Stopped")
>  Exit.exit(e.statusCode())
>case e: Throwable =>
>  if (isRunning)
>error("Error due to", e)
>  } finally {
>shutdownComplete.countDown()
>  }
>  info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the 
> thread stops doing work. I am not sure whether this is the intended behavior 
> of the ShutdownableThread, or the exception should be caught and we should 
> keep calling doWork()
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions

2018-03-28 Thread Srinivas Dhruvakumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416862#comment-16416862
 ] 

Srinivas Dhruvakumar commented on KAFKA-:
-

[~huxi_2b] -tried the latest patch KAFKA-3978 still hitting this bug 

> OffsetOutOfRangeException: Replica Thread Stopped Resulting in 
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Critical
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All, 
> Currently we were seeing a few underreplicated partitions on our test cluster 
> which is used for Intergation testing. On debugging more we found the replica 
> thread was stopped due to an error 
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot 
> increment the log start offset to 50 of partition  since it is larger 
> than the high watermark -1
> Kindly find the attached screenshot. 
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)