[
https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Di Shang updated KAFKA-6706:
Description:
We have 2 clusters A and B with 4 brokers each, we use mirrormaker to replicate
topics from A to B.
We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade we
started seeing the mirrormaker task showing producer errors and intermittently
dying.
We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem.
Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so we
think it's a server side problem.
There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For
testing, I used a topic *logging* with 20 partitions and 3 replicas (same on
cluster A and B), the source topic has 50+ million msg.
(this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar)
{noformat}
22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 35122 on
topic-partition logging-7, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 51572 on
topic-partition logging-7, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 57785 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 85406 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 128047 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 137049 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 153976 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:27:57.779 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-2, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-10, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-14, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-6, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:29:12.378 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
[
https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Di Shang updated KAFKA-6706:
Description:
We have 2 clusters A and B with 4 brokers each, we use mirrormaker to replicate
topics from A to B.
We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade we
started seeing the mirrormaker task showing producer errors and intermittently
dying.
We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem.
Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so we
think it's a server side problem.
There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For
testing, I used a topic *logging* with 20 partitions and 3 replicas (same on
cluster A and B), the source topic has 50+ million msg.
(this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar)
{noformat}
22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 35122 on
topic-partition logging-7, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 51572 on
topic-partition logging-7, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 57785 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 85406 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 128047 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 137049 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 153976 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:27:57.779 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-2, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-10, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-14, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-6, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:29:12.378 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
[
https://issues.apache.org/jira/browse/KAFKA-6706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Di Shang updated KAFKA-6706:
Priority: Blocker (was: Major)
Description:
We have 2 clusters A and B with 4 brokers each, we use mirrormaker to replicate
topics from A to B.
We recently upgraded our brokers from 0.10.2.0 to 1.0.0, after the upgrade we
started seeing the mirrormaker task showing producer errors and intermittently
dying.
We tried using 1.0.0 and 0.10.2.0 mirrormaker, both have the same problem.
Downgrading cluster B brokers back to 0.10.2.0 and the problem went away, so we
think it's a server side problem.
There are 2 types of errors: REQUEST_TIMED_OUT and NETWORK_EXCEPTION. For
testing, I used a topic *logging* with 20 partitions and 3 replicas (same on
cluster A and B), the source topic has 50+ million msg.
(this is from mirrormaker 1.0 at info level, the 0.10.2.0 log is very similar)
{noformat}
22 Mar 2018 02:16:07.407 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 35122 on
topic-partition logging-7, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:17:49.731 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 51572 on
topic-partition logging-7, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:18:33.903 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 57785 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:21:21.399 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 85406 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:25:22.278 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 128047 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:26:17.154 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 137049 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:27:57.358 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 153976 on
topic-partition logging-5, retrying (2147483646 attempts left). Error:
REQUEST_TIMED_OUT
22 Mar 2018 02:27:57.779 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-2, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-10, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.780 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-18, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-14, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:27:57.781 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender warn(line:251) [Producer
clientId=producer-1] Got error produce response with correlation id 154077 on
topic-partition logging-6, retrying (2147483646 attempts left). Error:
NETWORK_EXCEPTION
22 Mar 2018 02:29:12.378 [kafka-producer-network-thread | producer-1] WARN
[
https://issues.apache.org/jira/browse/KAFKA-6717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418435#comment-16418435
]
Ewen Cheslack-Postava commented on KAFKA-6717:
--
Are both assigned the partition in the same generation? As consumers join the
group, it will rebalance and change the assignment. If you are just looking at
which consumers are assigned which partitions, it could appear that two of them
are assigned the same partitions at the same time.
I see you are using the NoOpConsumerRebalanceListener. With this, you wouldn't
see when partitions were assigned or revoked. What are you doing to verify that
both consumer instances are assigned the same partitions at the same time?
Without correct handling of partition assignments and revocation, you
definitely could see data processed twice. In fact, without additional steps
taken to ensure no duplicates, *at least once* handling is what Kafka consumers
would normally provide as long as they handle offset commits properly.
> TopicPartition Assined twice to a consumer group for 2 consumer instances
> --
>
> Key: KAFKA-6717
> URL: https://issues.apache.org/jira/browse/KAFKA-6717
> Project: Kafka
> Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Yuancheng PENG
>Priority: Major
>
> I'm using \{{StickyAssignor}} for consuming more than 100 topics with certain
> pattern.
> There are 10 consumers with the same group id.
> I expected that topic-partition to be assigned to only one consumer instance.
> However some topic partitions are assigned twice in 2 different difference
> instance, hence the consumer group process duplicate messages.
> {code:java}
> props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
> Collections.singletonList(StickyAssignor.class));
> KafkaConsumer c = new KafkaConsumer<>(props);
> c.subscribe(Pattern.compile(TOPIC_PATTERN), new
> NoOpConsumerRebalanceListener());
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418337#comment-16418337
]
Boyang Chen commented on KAFKA-6657:
[~guozhang]thanks Guozhang! I will take a look at this one.
> Add StreamsConfig prefix for different consumers
>
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Major
> Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main
> consumer (2) the restore consumer and (3) the global consumer (that is a
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different
> consumers. Thus, we should add two new prefix for restore and global
> consumer. We might also consider to extend `KafkaClientSupplier` and add a
> `getGlobalConsumer()` method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boyang Chen reassigned KAFKA-6657:
--
Assignee: Boyang Chen
> Add StreamsConfig prefix for different consumers
>
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Boyang Chen
>Priority: Major
> Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main
> consumer (2) the restore consumer and (3) the global consumer (that is a
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different
> consumers. Thus, we should add two new prefix for restore and global
> consumer. We might also consider to extend `KafkaClientSupplier` and add a
> `getGlobalConsumer()` method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
Matt Farmer created KAFKA-6725:
--
Summary: Indicate "isClosing" in the SinkTaskContext
Key: KAFKA-6725
URL: https://issues.apache.org/jira/browse/KAFKA-6725
Project: Kafka
Issue Type: New Feature
Reporter: Matt Farmer
Addition of the isClosing method to SinkTaskContext per this KIP.
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75977607
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-6376:
---
Description:
Copy this from KIP-210 discussion thread:
{quote}
Note that currently we have two metrics for `skipped-records` on different
levels:
1) on the highest level, the thread-level, we have a `skipped-records`,
that records all the skipped records due to deserialization errors.
2) on the lower processor-node level, we have a
`skippedDueToDeserializationError`, that records the skipped records on
that specific source node due to deserialization errors.
So you can see that 1) does not cover any other scenarios and can just be
thought of as an aggregate of 2) across all the tasks' source nodes.
However, there are other places that can cause a record to be dropped, for
example:
1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
dropped due to window elapsed.
2) KIP-210: records could be dropped on the producer side.
3) records could be dropped during user-customized processing on errors.
{quote}
[~guozhang] Not sure what you mean by "3) records could be dropped during
user-customized processing on errors."
Btw: we also drop record with {{null}} key and/or value for certain DSL
operations. This should be included as well.
KIP: :
https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics
was:
Copy this from KIP-210 discussion thread:
{quote}
Note that currently we have two metrics for `skipped-records` on different
levels:
1) on the highest level, the thread-level, we have a `skipped-records`,
that records all the skipped records due to deserialization errors.
2) on the lower processor-node level, we have a
`skippedDueToDeserializationError`, that records the skipped records on
that specific source node due to deserialization errors.
So you can see that 1) does not cover any other scenarios and can just be
thought of as an aggregate of 2) across all the tasks' source nodes.
However, there are other places that can cause a record to be dropped, for
example:
1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
dropped due to window elapsed.
2) KIP-210: records could be dropped on the producer side.
3) records could be dropped during user-customized processing on errors.
{quote}
[~guozhang] Not sure what you mean by "3) records could be dropped during
user-customized processing on errors."
Btw: we also drop record with {{null}} key and/or value for certain DSL
operations. This should be included as well.
> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
> Issue Type: Improvement
> Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
> Labels: kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL
> operations. This should be included as well.
> KIP: :
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-6376:
---
Labels: kip (was: needs-kip)
> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
> Issue Type: Improvement
> Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
> Labels: kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL
> operations. This should be included as well.
> KIP: :
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418207#comment-16418207
]
Matthias J. Sax commented on KAFKA-6437:
I mean that some topics are available but other are not (ie if there are
multiple input topics). There are case for which Kafka Streams would not fail
but just process the available topics atm.
I agree that KAFAK-6520 is different; however, it's somehow related (-> state
"RUNNING" is confusion and not really appropriate). Just wanted to point out
the relationship. Not sure, if we should introduce DISCONNECTED and IDLE or
just one state for both. I mentioned it to get a "global picture" only.
> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
> Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it
> hangs "in the middle" of the topology (at …9, see below). Only parts of
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
> ProcessorTopology:
> KSTREAM-SOURCE-11:
> topics:
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
> children: [KTABLE-AGGREGATE-12]
> KTABLE-AGGREGATE-12:
> states:
> [KTABLE-AGGREGATE-STATE-STORE-09]
> children: [KTABLE-TOSTREAM-20]
> KTABLE-TOSTREAM-20:
> children: [KSTREAM-SINK-21]
> KSTREAM-SINK-21:
> topic: data_udr_month_customer_aggregration
> KSTREAM-SOURCE-17:
> topics:
> [mystreams_app-KSTREAM-MAP-14-repartition]
> children: [KSTREAM-LEFTJOIN-18]
> KSTREAM-LEFTJOIN-18:
> states:
> [KTABLE-AGGREGATE-STATE-STORE-09]
> children: [KSTREAM-SINK-19]
> KSTREAM-SINK-19:
> topic: data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0,
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the
> missing input topic. This preprocessing won't happen without the topic,
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing
> topic and it's consequences.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418195#comment-16418195
]
John Roesler commented on KAFKA-6376:
-
I have created a KIP to solve these issues:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics
> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
> Issue Type: Improvement
> Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
> Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL
> operations. This should be included as well.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418185#comment-16418185
]
Mariam John commented on KAFKA-6437:
[~mjsax] what do you mean by partially available input topics? KAFKA-6520 deals
with when the kafka broker is down, then the Kafka streams apps connected to it
have the state as RUNNING. I think, like you suggested we could have an IDLE
state in both cases and log a different warning for the different cases. For
example, in this case, it would be because of missing input topics and in the
case of KAFKA-6520, it would be because it is unable to connect to the broker.
> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
> Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it
> hangs "in the middle" of the topology (at …9, see below). Only parts of
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
> ProcessorTopology:
> KSTREAM-SOURCE-11:
> topics:
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
> children: [KTABLE-AGGREGATE-12]
> KTABLE-AGGREGATE-12:
> states:
> [KTABLE-AGGREGATE-STATE-STORE-09]
> children: [KTABLE-TOSTREAM-20]
> KTABLE-TOSTREAM-20:
> children: [KSTREAM-SINK-21]
> KSTREAM-SINK-21:
> topic: data_udr_month_customer_aggregration
> KSTREAM-SOURCE-17:
> topics:
> [mystreams_app-KSTREAM-MAP-14-repartition]
> children: [KSTREAM-LEFTJOIN-18]
> KSTREAM-LEFTJOIN-18:
> states:
> [KTABLE-AGGREGATE-STATE-STORE-09]
> children: [KSTREAM-SINK-19]
> KSTREAM-SINK-19:
> topic: data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0,
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the
> missing input topic. This preprocessing won't happen without the topic,
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing
> topic and it's consequences.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418143#comment-16418143
]
Chris Egerton commented on KAFKA-6417:
--
[~cotedm] Alright, took another look at the Connect code base and have some bad
news :( It looks like there are enough different places that the
{{ClassNotFoundException}} can be thrown from at enough different times that
catching it and displaying a useful error message would be harder than I
originally thought. On top of that, it would be pretty difficult to
differentiate between "It looks like this JAR is missing because you specified
an incorrect plugin path" and "This JAR is actually missing" in most cases,
since the exception will have occurred after the initial plugin class was
loaded and outside any code that deals with plugin isolation.
As far as forcing all JAR files to go into their own directories goes, I have a
few more thoughts. If someone tries to start up connect, gets an error about
JAR files found on the plugin path (possibly also with the information that
"All JAR files must be placed in a subdirectory of a plugin path directory" or
something along those lines), and wants to fix the issue, do you think it's
likely that the next step for them would be to then create a subdirectory and
then place all of their uber JARs inside that directory? It may seem like a
reasonable assumption that uber JARs would be loaded independently of each
other, but if we don't have a meaningful way of differentiating them from
non-uber JARs in the Connect framework, we'd just end up loading all of the
plugins in that directory with the same classloader, potentially leading to the
same overlapping dependency issues that the whole {{plugin.path}} configuration
is meant to prevent.
> plugin.path pointing at a plugin directory causes ClassNotFoundException
>
>
> Key: KAFKA-6417
> URL: https://issues.apache.org/jira/browse/KAFKA-6417
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Dustin Cote
>Priority: Major
>
> When using the {{plugin.path}} configuration for the Connect workers, the
> user is expected to specify a list containing the following per the docs:
> {quote}
> The list should consist of top level directories that include any combination
> of: a) directories immediately containing jars with plugins and their
> dependencies b) uber-jars with plugins and their dependencies c) directories
> immediately containing the package directory structure of classes of plugins
> and their dependencies
> {quote}
> This means we would expect {{plugin.path=/usr/share/plugins}} for a structure
> like {{/usr/share/plugins/myplugin1}},{{/usr/share/plugins/myplugin2}}, etc.
> However if you specify {{plugin.path=/usr/share/plugins/myplugin1}} the
> resulting behavior is that dependencies for {{myplugin1}} are not properly
> loaded. This causes a {{ClassNotFoundException}} that is not intuitive to
> debug.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418131#comment-16418131
]
Matthias J. Sax commented on KAFKA-6437:
[~wojda] Thanks for follow up. With regard to the "RUNNING" status, it seems to
be related to KAFKA-6520.
About this ticket and KAFKA-6720: I agree that it is opinion based if failing
of logging is the right approach. Note, that Kafka Streams inherits its
behavior from KafkaConsumer: it also idles if the input topics don't exist.
Therefore, it might even be required to change the consumer, affecting even
more developers. Maybe [~guozhang] can shed some light, why the consumer is
designed in this way (I am sure, there are good reasons for it).
About adding a new config: might be a solution. However, from our experience we
learned that having too many configs can be confusion for users -- thus, we
tend to be conservative about adding new configs if there is better solution. I
am not saying, we should not introduce a "fail-on-missing-topic" configs, I am
just saying we should discuss it in detail before we make a decision. Do you
think that fixing KAFKA-6520 would be an acceptable alternative solution
instead of throwing an exception? Meaning, introducing an new "IDLE" state if
the input topics are missing? Question would be, how to handle partially
available input topics? (Just putting out ideas here...)
> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
> Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it
> hangs "in the middle" of the topology (at …9, see below). Only parts of
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
> ProcessorTopology:
> KSTREAM-SOURCE-11:
> topics:
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
> children: [KTABLE-AGGREGATE-12]
> KTABLE-AGGREGATE-12:
> states:
> [KTABLE-AGGREGATE-STATE-STORE-09]
> children: [KTABLE-TOSTREAM-20]
> KTABLE-TOSTREAM-20:
> children: [KSTREAM-SINK-21]
> KSTREAM-SINK-21:
> topic: data_udr_month_customer_aggregration
> KSTREAM-SOURCE-17:
> topics:
> [mystreams_app-KSTREAM-MAP-14-repartition]
> children: [KSTREAM-LEFTJOIN-18]
> KSTREAM-LEFTJOIN-18:
> states:
> [KTABLE-AGGREGATE-STATE-STORE-09]
> children: [KSTREAM-SINK-19]
> KSTREAM-SINK-19:
> topic: data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0,
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the
> missing input topic. This preprocessing won't happen without the topic,
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing
> topic and it's consequences.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418117#comment-16418117
]
Guozhang Wang commented on KAFKA-6657:
--
Thanks [~sssanthalingam], I'm unassigning since the reporter of
https://issues.apache.org/jira/browse/KAFKA-6723 would like to work on it.
> Add StreamsConfig prefix for different consumers
>
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Major
> Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main
> consumer (2) the restore consumer and (3) the global consumer (that is a
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different
> consumers. Thus, we should add two new prefix for restore and global
> consumer. We might also consider to extend `KafkaClientSupplier` and add a
> `getGlobalConsumer()` method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang reassigned KAFKA-6657:
Assignee: (was: siva santhalingam)
> Add StreamsConfig prefix for different consumers
>
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Major
> Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main
> consumer (2) the restore consumer and (3) the global consumer (that is a
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different
> consumers. Thus, we should add two new prefix for restore and global
> consumer. We might also consider to extend `KafkaClientSupplier` and add a
> `getGlobalConsumer()` method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang resolved KAFKA-6723.
--
Resolution: Duplicate
> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore
> consumer and normal stream consumer. In reality, they are doing different
> processing workloads, and in order to speed up the restore speed, restore
> consumer is supposed to have a higher throughput by setting `max.poll.record`
> higher. The change involved is trivial:
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name),
> so we need a KIP.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418114#comment-16418114
]
Guozhang Wang commented on KAFKA-6723:
--
Boyang, I'm going to mark this ticket as subsumed by KAFKA-6657, and please
feel free to take KAFKA-6657 as Siva does not have time to work on it lately.
> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore
> consumer and normal stream consumer. In reality, they are doing different
> processing workloads, and in order to speed up the restore speed, restore
> consumer is supposed to have a higher throughput by setting `max.poll.record`
> higher. The change involved is trivial:
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name),
> so we need a KIP.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418111#comment-16418111
]
Matthias J. Sax commented on KAFKA-5253:
Thanks for working on this. If the fix does not require a public API changes,
we don't need a KIP -- when I created the KIP, I was assuming that we will need
a public API for the fix. Please go ahead an open the PR and we can take it
from there.
> TopologyTestDriver must handle streams created with patterns
>
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
> Labels: beginner, needs-kip, newbie
>
> *Context*
> -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
> The unit test of the topology fails because -KStreamTestDriver-
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
> Underneath is a unit test explaining what I understand should happen, but is
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an
> exception as the topology builder explicitly checks overlapping topic names
> and patterns, in any order of adding pattern and topic. So, it is intended
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source =
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME),
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
> If anybody can help in defining the solution, I can create a pull request
> for this change.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418106#comment-16418106
]
Matthias J. Sax commented on KAFKA-5882:
Changing the structure of the topology is not a compatible change. Are you
doing rolling-bounce upgrades? For this case, a failure is expected. You would
need to put the whole application offline (ie, shut down all old instances) and
then redeploy the new applications. For this offline upgrade, it should work.
Or do you see the NPE if you do an offline upgrade?
> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073]
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with
> timeoutMillis = 9223372036854775807 ms.
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418101#comment-16418101
]
Vikas Tikoo commented on KAFKA-5882:
Guozhang Wang: Here's rest of the log trace -
{code}
StreamsException
stream-thread
[pipeline-helper-e7f04112-3f80-49a2-8585-f5ecf63b009a-StreamThread-1] Failed to
rebalance.
org.apache.kafka.streams.processor.internals.StreamThread in pollRequests at
line 589
org.apache.kafka.streams.processor.internals.StreamThread}} in runLoop at line
553
org.apache.kafka.streams.processor.internals.StreamThread}} in run at line 527
{code}
> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073]
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with
> timeoutMillis = 9223372036854775807 ms.
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418097#comment-16418097
]
Ismael Juma commented on KAFKA-6598:
Here's a link:
https://lists.apache.org/thread.html/2bc187040051008452b40b313db06b476c248ef7a5ed7529afe7b118@1448997154@%3Cdev.kafka.apache.org%3E
> Kafka to support using ETCD beside Zookeeper
>
>
> Key: KAFKA-6598
> URL: https://issues.apache.org/jira/browse/KAFKA-6598
> Project: Kafka
> Issue Type: New Feature
> Components: clients, core
>Reporter: Sebastian Toader
>Priority: Major
>
> The current Kafka implementation is bound to {{Zookeeper}} to store its
> metadata for forming a cluster of nodes (producer/consumer/broker).
> As Kafka is becoming popular for streaming in various environments where
> {{Zookeeper}} is either not easy to deploy/manage or there are better
> alternatives to it there is a need
> to run Kafka with other metastore implementation than {{Zookeeper}}.
> {{etcd}} can provide the same semantics as {{Zookeeper}} for Kafka and since
> {{etcd}} is the favorable choice in certain environments (e.g. Kubernetes)
> Kafka should be able to run with {{etcd}}.
> From the user's point of view should be straightforward to configure to use
> {{etcd}} by just simply specifying a connection string that point to {{etcd}}
> cluster.
> To avoid introducing instability the original interfaces should be kept and
> only the low level {{Zookeeper}} API calls should be replaced with \{{etcd}}
> API calls in case Kafka is configured
> to use {{etcd}}.
> On the long run (which is out of scope of this jira) there should be an
> abstract layer in Kafka which then various metastore implementations would
> implement.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418090#comment-16418090
]
Daniel Wojda commented on KAFKA-6720:
-
My mistake [~johnma]. That makes sense. Thank you for explanation!
> Inconsistent Kafka Streams behaviour when topic does not exist
> --
>
> Key: KAFKA-6720
> URL: https://issues.apache.org/jira/browse/KAFKA-6720
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Affects Versions: 1.0.1
>Reporter: Daniel Wojda
>Priority: Minor
>
> When Kafka Streams starts it reads metadata about topics used in topology
> and it's partitions. If topology of that stream contains stateful operation
> like #join, and a topic does not exist
> [TopologyBuilderException|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L719]
> will be thrown.
> In case of streams with simple topology with stateless operations only, like
> #mapValue, and topic does not exist, Kafka Streams does not throw any
> exception, just logs a warning:
> ["log.warn("No partitions found for topic {}",
> topic);"|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L435]
>
> I believe the behaviour of Kafka Streams in both cases should be the same,
> and it should throw TopologyBuilderException.
> I am more than happy to prepare a Pull Request if it is a valid issue.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418089#comment-16418089
]
Mariam John commented on KAFKA-6720:
[~wojda] if you look at the resolved tag, it is marked as duplicate. The only
way to mark it as duplicate is to resolve it as duplicate. If you look at
KAFKA-6437, you will see some comments that [~guozhang] added yesterday. We
will use that defect to resolve this defect as well as KAFKA-6437. Hope that
makes sense. I will add a comment as soon as I upload a fix. Thank you.
> Inconsistent Kafka Streams behaviour when topic does not exist
> --
>
> Key: KAFKA-6720
> URL: https://issues.apache.org/jira/browse/KAFKA-6720
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Affects Versions: 1.0.1
>Reporter: Daniel Wojda
>Priority: Minor
>
> When Kafka Streams starts it reads metadata about topics used in topology
> and it's partitions. If topology of that stream contains stateful operation
> like #join, and a topic does not exist
> [TopologyBuilderException|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L719]
> will be thrown.
> In case of streams with simple topology with stateless operations only, like
> #mapValue, and topic does not exist, Kafka Streams does not throw any
> exception, just logs a warning:
> ["log.warn("No partitions found for topic {}",
> topic);"|https://github.com/apache/kafka/blob/5bdfbd13524da667289cacb774bb92df36a253f2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java#L435]
>
> I believe the behaviour of Kafka Streams in both cases should be the same,
> and it should throw TopologyBuilderException.
> I am more than happy to prepare a Pull Request if it is a valid issue.
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418073#comment-16418073
]
Daniel Wojda commented on KAFKA-6437:
-
I would like to add my comment as a user of Kafka Streams and author of
KAFKA-6720.
Important missing information here is that if you start Kafka Streams
application without input topics created, it'll log a warning and stays in this
"idle" state until you create that topic(s) *AND* a rebalancing happens. If you
check the status of stream it will be "RUNNING". What is more, please correct
me if I'm wrong, checking consumer lag will not help, because lag will be 0
(number of messages in non-existing topic is 0).
As [~mjsax] already mentioned "it's well documented that you need to create all
input topics before you start your application", so in my opinion "stopping the
world and failing" is a better option than starting a "zombie" application.
I understand that Kafka Streams has many users, other developers can have a
different opinion than me, but in that case I'd suggest introducing a new
config. "fail-on-missing-topic"? WDYT?
> Streams does not warn about missing input topics, but hangs
> ---
>
> Key: KAFKA-6437
> URL: https://issues.apache.org/jira/browse/KAFKA-6437
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.0.0
> Environment: Single client on single node broker
>Reporter: Chris Schwarzfischer
>Assignee: Mariam John
>Priority: Minor
> Labels: newbie
>
> *Case*
> Streams application with two input topics being used for a left join.
> When the left side topic is missing upon starting the streams application, it
> hangs "in the middle" of the topology (at …9, see below). Only parts of
> the intermediate topics are created (up to …9)
> When the missing input topic is created, the streams application resumes
> processing.
> {noformat}
> Topology:
> StreamsTask taskId: 2_0
> ProcessorTopology:
> KSTREAM-SOURCE-11:
> topics:
> [mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition]
> children: [KTABLE-AGGREGATE-12]
> KTABLE-AGGREGATE-12:
> states:
> [KTABLE-AGGREGATE-STATE-STORE-09]
> children: [KTABLE-TOSTREAM-20]
> KTABLE-TOSTREAM-20:
> children: [KSTREAM-SINK-21]
> KSTREAM-SINK-21:
> topic: data_udr_month_customer_aggregration
> KSTREAM-SOURCE-17:
> topics:
> [mystreams_app-KSTREAM-MAP-14-repartition]
> children: [KSTREAM-LEFTJOIN-18]
> KSTREAM-LEFTJOIN-18:
> states:
> [KTABLE-AGGREGATE-STATE-STORE-09]
> children: [KSTREAM-SINK-19]
> KSTREAM-SINK-19:
> topic: data_UDR_joined
> Partitions [mystreams_app-KSTREAM-MAP-14-repartition-0,
> mystreams_app-KTABLE-AGGREGATE-STATE-STORE-09-repartition-0]
> {noformat}
> *Why this matters*
> The applications does quite a lot of preprocessing before joining with the
> missing input topic. This preprocessing won't happen without the topic,
> creating a huge backlog of data.
> *Fix*
> Issue an `warn` or `error` level message at start to inform about the missing
> topic and it's consequences.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418023#comment-16418023
]
John Roesler edited comment on KAFKA-6376 at 3/28/18 8:00 PM:
--
Just doing a little initial archaeology...
The thread-level metric is updated here:
org.apache.kafka.streams.processor.internals.StreamThread#addRecordsToTasks:
{quote}streamsMetrics.skippedRecordsSensor.record(records.count() -
numAddedRecords, timerStartedMs);{quote}
records.count() - numAddedRecords counts this:
org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative)
timestamp, the record is skipped and not added to the queue for processing
{quote}
Ah, but it drills down into
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we
have
{quote}final ConsumerRecord
[
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418027#comment-16418027
]
siva santhalingam commented on KAFKA-6657:
--
[~guozhang] I have not started on this yet and I don't think I can get to it
for a couple of weeks. So please reassign if needed.
> Add StreamsConfig prefix for different consumers
>
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: siva santhalingam
>Priority: Major
> Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main
> consumer (2) the restore consumer and (3) the global consumer (that is a
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different
> consumers. Thus, we should add two new prefix for restore and global
> consumer. We might also consider to extend `KafkaClientSupplier` and add a
> `getGlobalConsumer()` method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418023#comment-16418023
]
John Roesler edited comment on KAFKA-6376 at 3/28/18 7:55 PM:
--
Just doing a little initial archaeology...
Thread-level metric looks like it only counts this:
org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative)
timestamp, the record is skipped and not added to the queue for processing
{quote}
Ah, but it drills down into
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we
have
{quote}final ConsumerRecord record =
recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null)
Unknown macro: \{ continue; }{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
if (timestamp < 0)
Unknown macro: \{ continue; }{quote}
All other code paths in there either add the record or throw.
The former of these cases is accounted in
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
catch (final Exception deserializationException) {
...
else {
sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
return null;
}{quote}
all other paths either return a record or throw
was (Author: vvcephei):
Thread-level metric looks like it only counts this:
org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative)
timestamp, the record is skipped and not added to the queue for processing
{quote}
Ah, but it drills down into
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we
have
{quote}final ConsumerRecord record =
recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null) {
continue;
}{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
continue;
}{quote}
All other code paths in there either add the record or throw.
The former of these cases is accounted in
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
catch (final Exception deserializationException) {{quote}
> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
> Issue Type: Improvement
> Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
> Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL
> operations. This should be included as well.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16418023#comment-16418023
]
John Roesler commented on KAFKA-6376:
-
Thread-level metric looks like it only counts this:
org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative)
timestamp, the record is skipped and not added to the queue for processing
{quote}
Ah, but it drills down into
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we
have
{quote}final ConsumerRecord record =
recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null) {
continue;
}{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
continue;
}{quote}
All other code paths in there either add the record or throw.
The former of these cases is accounted in
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
catch (final Exception deserializationException) {{quote}
> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
> Issue Type: Improvement
> Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
> Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL
> operations. This should be included as well.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Amit Sela updated KAFKA-6684:
-
Description:
Casting from BYTES is not supported, which means that casting LogicalTypes is
not supported.
This proposes to allow casting anything to a string, kind of like Java's
{{toString()}}, such that if the object is actually a LogicalType it can be
"serialized" as string instead of bytes+schema.
Examples:
{{BigDecimal}} will cast to the string representation of the number.
{{Timestamp}} will cast to the string representation of the timestamp, or maybe
UTC {{mmddTHH:MM:SS.f}} format?
Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up
to the user to know the data.
This would help when using a JSON sink, or anything that's not Avro.
was:
Casting from BYTES is not supported, which means that casting LogicalTypes is
not supported.
This proposes to allow casting anything to a string, kind of like Java's
{{toString()}}, such that if the object is actually a LogicalType it can be
"serialized" as string instead of bytes+schema.
Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up
to the user to know the data.
This would help when using a JSON sink, or anything that's not Avro.
> Support casting values with bytes schema to string
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's
> {{toString()}}, such that if the object is actually a LogicalType it can be
> "serialized" as string instead of bytes+schema.
> Examples:
> {{BigDecimal}} will cast to the string representation of the number.
> {{Timestamp}} will cast to the string representation of the timestamp, or
> maybe UTC {{mmddTHH:MM:SS.f}} format?
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Amit Sela updated KAFKA-6684:
-
Description:
Casting from BYTES is not supported, which means that casting LogicalTypes is
not supported.
This proposes to allow casting anything to a string, kind of like Java's
{{toString()}}, such that if the object is actually a LogicalType it can be
"serialized" as string instead of bytes+schema.
{noformat}
Examples:
BigDecimal will cast to the string representation of the number.
Timestamp will cast to the string representation of the timestamp, or maybe UTC
mmddTHH:MM:SS.f format?
{noformat}
Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up
to the user to know the data.
This would help when using a JSON sink, or anything that's not Avro.
was:
Casting from BYTES is not supported, which means that casting LogicalTypes is
not supported.
This proposes to allow casting anything to a string, kind of like Java's
{{toString()}}, such that if the object is actually a LogicalType it can be
"serialized" as string instead of bytes+schema.
Examples:
{{BigDecimal}} will cast to the string representation of the number.
{{Timestamp}} will cast to the string representation of the timestamp, or maybe
UTC {{mmddTHH:MM:SS.f}} format?
Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up
to the user to know the data.
This would help when using a JSON sink, or anything that's not Avro.
> Support casting values with bytes schema to string
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's
> {{toString()}}, such that if the object is actually a LogicalType it can be
> "serialized" as string instead of bytes+schema.
>
> {noformat}
> Examples:
> BigDecimal will cast to the string representation of the number.
> Timestamp will cast to the string representation of the timestamp, or maybe
> UTC mmddTHH:MM:SS.f format?
> {noformat}
>
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Amit Sela updated KAFKA-6684:
-
Description:
Casting from BYTES is not supported, which means that casting LogicalTypes is
not supported.
This proposes to allow casting anything to a string, kind of like Java's
{{toString()}}, such that if the object is actually a LogicalType it can be
"serialized" as string instead of bytes+schema.
Examples:
{{BigDecimal}} will cast to the string representation of the number.
{{Timestamp}} will cast to the string representation of the timestamp, or maybe
UTC {{mmddTHH:MM:SS.f}} format?
Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up
to the user to know the data.
This would help when using a JSON sink, or anything that's not Avro.
was:
Casting from BYTES is not supported, which means that casting LogicalTypes is
not supported.
This proposes to allow casting anything to a string, kind of like Java's
{{toString()}}, such that if the object is actually a LogicalType it can be
"serialized" as string instead of bytes+schema.
Examples:
{{BigDecimal}} will cast to the string representation of the number.
{{Timestamp}} will cast to the string representation of the timestamp, or maybe
UTC {{mmddTHH:MM:SS.f}} format?
Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up
to the user to know the data.
This would help when using a JSON sink, or anything that's not Avro.
> Support casting values with bytes schema to string
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's
> {{toString()}}, such that if the object is actually a LogicalType it can be
> "serialized" as string instead of bytes+schema.
> Examples:
> {{BigDecimal}} will cast to the string representation of the number.
> {{Timestamp}} will cast to the string representation of the timestamp, or
> maybe UTC {{mmddTHH:MM:SS.f}} format?
>
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Amit Sela updated KAFKA-6684:
-
Description:
Casting from BYTES is not supported, which means that casting LogicalTypes is
not supported.
This proposes to allow casting anything to a string, kind of like Java's
{{toString()}}, such that if the object is actually a LogicalType it can be
"serialized" as string instead of bytes+schema.
Examples:
{{BigDecimal}} will cast to the string representation of the number.
{{Timestamp}} will cast to the string representation of the timestamp, or maybe
UTC {{mmddTHH:MM:SS.f}} format?
Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up
to the user to know the data.
This would help when using a JSON sink, or anything that's not Avro.
was:
Casting from BYTES is not supported, which means that casting LogicalTypes is
not supported.
This proposes to allow casting anything to a string, kind of like Java's
{{toString()}}, such that if the object is actually a LogicalType it can be
"serialized" as string instead of bytes+schema.
Examples:
{{BigDecimal}} will cast to the string representation of the number.
{{Timestamp}} will cast to the string representation of the timestamp, or maybe
UTC {{mmddTHH:MM:SS.f}} format?
Worst case, bytes are "casted" to whatever the {{toString()}} returns - its up
to the user to know the data.
This would help when using a JSON sink, or anything that's not Avro.
> Support casting values with bytes schema to string
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's
> {{toString()}}, such that if the object is actually a LogicalType it can be
> "serialized" as string instead of bytes+schema.
> Examples:
> {{BigDecimal}} will cast to the string representation of the number.
> {{Timestamp}} will cast to the string representation of the timestamp, or
> maybe UTC {{mmddTHH:MM:SS.f}} format?
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417959#comment-16417959
]
Cemalettin Koç commented on KAFKA-6713:
---
Hi [~guozhang],
For 1 and 2)
I would like to access my customized version of `CategoryInMemoryStore`. I will
be glad if you write something simple for me. I could not follow at your
comment.
3) My category topic is updated rarely. I need all data and I am using
GlobalKTable and I am using some rendering at my pages. Since it is changing
rarely, I would like to cache it but in case an update I would like to
invalidate this cache. However currently I could not find a way to be notified
in case a my GlobalKTable based InMemoryStore is updated. If a new Category
added or changed in my InMemoryStore, I would like to trigger invalidation of
my rendering cache.
> Provide an easy way replace store with a custom one on High-Level Streams DSL
> -
>
> Key: KAFKA-6713
> URL: https://issues.apache.org/jira/browse/KAFKA-6713
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.0.1
>Reporter: Cemalettin Koç
>Priority: Major
> Labels: streaming-api
> Attachments: BytesTypeConverter.java, DelegatingByteStore.java,
> TypeConverter.java
>
>
> I am trying to use GlobalKTable with a custom store implementation. In my
> stores, I would like to store my `Category` entites and I would like to query
> them by their name as well. My custom store has some capabilities beyond
> `get` such as get by `name`. I also want to get all entries in a hierarchical
> way in a lazy fashion. I have other use cases as well.
>
> In order to accomplish my task I had to implement a custom
> `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and
>
> {code:java}
> public class DelegatingByteStore implements KeyValueStore byte[]> {
> private BytesTypeConverter converter;
> private KeyValueStore delegated;
> public DelegatingByteStore(KeyValueStore delegated,
> BytesTypeConverter converter) {
> this.converter = converter;
> this.delegated = delegated;
> }
> @Override
> public void put(Bytes key, byte[] value) {
> delegated.put(converter.outerKey(key),
> converter.outerValue(value));
> }
> @Override
> public byte[] putIfAbsent(Bytes key, byte[] value) {
> V v = delegated.putIfAbsent(converter.outerKey(key),
> converter.outerValue(value));
> return v == null ? null : value;
> }
> ..
> {code}
>
> Type Converter:
> {code:java}
> public interface TypeConverter {
> IK innerKey(final K key);
> IV innerValue(final V value);
> List> innerEntries(final List> from);
> List> outerEntries(final List> from);
> V outerValue(final IV value);
> KeyValue outerKeyValue(final KeyValue from);
> KeyValueinnerKeyValue(final KeyValue entry);
> K outerKey(final IK ik);
> }
> {code}
>
> This is unfortunately too cumbersome and hard to maintain.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417946#comment-16417946
]
Guozhang Wang commented on KAFKA-5882:
--
Filed https://github.com/apache/kafka/pull/4790 to better improve the logging
upon those errors.
> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073]
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with
> timeoutMillis = 9223372036854775807 ms.
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417926#comment-16417926
]
Guozhang Wang commented on KAFKA-5882:
--
Thanks Vikas, Ionut-Maxim for reporting this issue, and I think there is indeed
some bugs lurking around here. To help us further investigate the issue, could
you provide more logs on the observed scenario? For example, after the
{code}
Error caught during partition assignment ..
{code}
There should be a {{Failed to rebalance.}} error message which expose the
exception type and the stack trace. That can help us better allocate which part
is throwing the exception (since the exception.getMessage() returns null I
cannot tell if it is a task migration issue or others).
> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073]
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with
> timeoutMillis = 9223372036854775807 ms.
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417922#comment-16417922
]
John Roesler commented on KAFKA-6474:
-
Hey [~h314to],
I hope you're doing well.
FYI, we have just merged [https://github.com/apache/kafka/pull/4760]
incorporating your pattern for hooking test-utils into the Streams test
dependencies. You may have conflicts to resolve when you rebase on trunk.
Thanks,
-John
> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
> Issue Type: Improvement
> Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
> Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own
> test to use this new test driver and remove the two classes
> ProcessorTopoogyTestDriver and KStreamTestDriver.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417896#comment-16417896
]
Jagadesh Adireddi commented on KAFKA-5253:
--
Hi [~mjsax],
I am new to Kafka contribution. I made code changes for above issue and it's
working as expected.
I have few questions before I submit pull request. I made changes inside
*private* method
*KStreamTestDriver*#*sourceNodeByTopicName .* I haven't modified any method
signature or so. Just embedded below code
{code:java}
Set sourceTopics = topology.sourceTopics();
for (final String eachSourceTopic : sourceTopics) {
if (Pattern.compile(eachSourceTopic).matcher(topicName).matches()) {
return topology.source(eachSourceTopic);
}
}
{code}
Do i still need to submit KIP for this change, as i am not touching any public
methods.
> TopologyTestDriver must handle streams created with patterns
>
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
> Labels: beginner, needs-kip, newbie
>
> *Context*
> -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
> The unit test of the topology fails because -KStreamTestDriver-
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
> Underneath is a unit test explaining what I understand should happen, but is
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an
> exception as the topology builder explicitly checks overlapping topic names
> and patterns, in any order of adding pattern and topic. So, it is intended
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source =
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME),
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
> If anybody can help in defining the solution, I can create a pull request
> for this change.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-6376:
---
Affects Version/s: (was: 1.2.0)
1.0.0
> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
> Issue Type: Improvement
> Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
> Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL
> operations. This should be included as well.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417858#comment-16417858
]
Guozhang Wang commented on KAFKA-6657:
--
Hello Siva, are you still working on this ticket?
> Add StreamsConfig prefix for different consumers
>
>
> Key: KAFKA-6657
> URL: https://issues.apache.org/jira/browse/KAFKA-6657
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: siva santhalingam
>Priority: Major
> Labels: beginner, needs-kip, newbie, newbie++
>
> Kafka Streams allows to pass in different configs for different clients by
> prefixing the corresponding parameter with `producer.` or `consumer.`.
> However, Kafka Streams internally uses multiple consumers, (1) the main
> consumer (2) the restore consumer and (3) the global consumer (that is a
> restore consumer as well atm).
> For some use cases, it's required to set different configs for different
> consumers. Thus, we should add two new prefix for restore and global
> consumer. We might also consider to extend `KafkaClientSupplier` and add a
> `getGlobalConsumer()` method.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417856#comment-16417856
]
Guozhang Wang commented on KAFKA-6723:
--
The proposal is to have a prefix for restore consumer and global consumer
separate, not for a specific config only. I.e. for any consumer configs, users
can prefix it to differentiate between different consumers, like:
consumer.poll.ms
restore-consumer.poll.ms
global-consumer.poll.ms
> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore
> consumer and normal stream consumer. In reality, they are doing different
> processing workloads, and in order to speed up the restore speed, restore
> consumer is supposed to have a higher throughput by setting `max.poll.record`
> higher. The change involved is trivial:
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name),
> so we need a KIP.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
John Roesler updated KAFKA-6376:
Affects Version/s: (was: 1.0.0)
1.2.0
> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
> Issue Type: Improvement
> Components: metrics, streams
>Affects Versions: 1.2.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
> Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL
> operations. This should be included as well.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417807#comment-16417807
]
ASF GitHub Bot commented on KAFKA-6724:
---
hachikuji closed pull request #4787: KAFKA-6724 ConsumerPerformance resets
offsets on every startup
URL: https://github.com/apache/kafka/pull/4787
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index a3e60e652c4..7e0dbcbe064 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -157,8 +157,6 @@ object ConsumerPerformance extends LazyLogging {
def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
joinStart = System.currentTimeMillis
}})
-consumer.poll(0)
-consumer.seekToBeginning(Collections.emptyList())
// Now start the benchmark
val startMs = System.currentTimeMillis
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
> Issue Type: Bug
> Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
> Fix For: 1.2.0
>
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for
> it's group on every startup.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417805#comment-16417805
]
Guozhang Wang commented on KAFKA-6642:
--
[~asurana] Thanks for creating this ticket, I was not aware that it has been
created some time ago. There is another ticket created recently as
https://issues.apache.org/jira/browse/KAFKA-6718. Could you take a look into
the proposed PR https://github.com/apache/kafka/pull/4785 and see if that
aligns with your proposals, and if you could collaborate with [~_deepakgoyal]
on that PR?
> Rack aware task assignment in kafka streams
> ---
>
> Key: KAFKA-6642
> URL: https://issues.apache.org/jira/browse/KAFKA-6642
> Project: Kafka
> Issue Type: New Feature
> Components: streams
>Reporter: Ashish Surana
>Priority: Major
>
> We have rack aware replica assignment in kafka broker ([KIP-36 Rack aware
> replica
> assignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]).
> This request is to have a similar feature for kafka streams applications.
> Standby tasks/standby replica assignment in kafka streams is currently not
> rack aware, and this request is to make it rack aware for better availability.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-6538:
-
Description:
In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and
only have concrete key/value types on outer layers/wrappers of the stores.
For this reason, the most inner {{RocksDBStore}} cannot provide useful error
messages anymore if a put/get/delete operation fails as it only handles plain
bytes.
In addition, the corresponding calls to record changelog records to record
collectors will also be sending byte arrays only, and hence when there is an
error happening, the record collector can only display the key but not the
value since it is all bytes:
{code}
[ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
task [2_2] Error sending record (key {"eventId":XXX,"version":123}
value [] timestamp YYY) to topic TTT
due to ...
{code}
Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with
corresponding information for which key/value the operation failed in the
wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}}
exceptions.
was:
In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and
only have concrete key/value types on outer layers/wrappers of the stores.
For this reason, the most inner {{RocksDBStore}} cannot provide useful error
messages anymore if a put/get/delete operation fails as it only handles plain
bytes.
Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with
corresponding information for which key/value the operation failed in the
wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}}
exceptions.
> Enhance ByteStore exceptions with more context information
> --
>
> Key: KAFKA-6538
> URL: https://issues.apache.org/jira/browse/KAFKA-6538
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Minor
> Labels: newbie
>
> In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and
> only have concrete key/value types on outer layers/wrappers of the stores.
> For this reason, the most inner {{RocksDBStore}} cannot provide useful error
> messages anymore if a put/get/delete operation fails as it only handles plain
> bytes.
> In addition, the corresponding calls to record changelog records to record
> collectors will also be sending byte arrays only, and hence when there is an
> error happening, the record collector can only display the key but not the
> value since it is all bytes:
> {code}
> [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
> task [2_2] Error sending record (key {"eventId":XXX,"version":123}
> value [] timestamp YYY) to topic TTT
> due to ...
> {code}
> Therefore, we should enhance exceptions thrown from {{RocksDBStore}} with
> corresponding information for which key/value the operation failed in the
> wrapping stores (KeyValueStore, WindowedStored, and SessionStore).
> Cf https://github.com/apache/kafka/pull/4518 that cleans up {{RocksDBStore}}
> exceptions.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jagadesh Adireddi reassigned KAFKA-5253:
Assignee: Jagadesh Adireddi
> TopologyTestDriver must handle streams created with patterns
>
>
> Key: KAFKA-5253
> URL: https://issues.apache.org/jira/browse/KAFKA-5253
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Wim Van Leuven
>Assignee: Jagadesh Adireddi
>Priority: Major
> Labels: beginner, needs-kip, newbie
>
> *Context*
> -KStreamTestDriver- TopologyTestDriver (added via KIP-247) is being used to
> unit test topologies while developing KStreams apps.
> One such topology uses a Pattern to consume from multiple topics at once.
> *Problem*
> The unit test of the topology fails because -KStreamTestDriver-
> TopologyTestDriver fails to deal with Patterns properly.
> *Example*
> Underneath is a unit test explaining what I understand should happen, but is
> failing.
> Explicitly adding a source topic matching the topic pattern, generates an
> exception as the topology builder explicitly checks overlapping topic names
> and patterns, in any order of adding pattern and topic. So, it is intended
> behaviour.
> {code:java}
> @Test
> public void shouldProcessFromSourcesThatDoMatchThePattern() {
> // -- setup stream pattern
> final KStream source =
> builder.stream(Pattern.compile("topic-source-\\d"));
> source.to("topic-sink");
> // -- setup processor to capture results
> final MockProcessorSupplier processorSupplier = new
> MockProcessorSupplier<>();
> source.process(processorSupplier);
> // -- add source to stream data from
> //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME),
> "topic-source-3");
> // -- build test driver
> driver = new KStreamTestDriver(builder);
> driver.setTime(0L);
> // -- test
> driver.process("topic-source-3", "A", "aa");
> // -- validate
> // no exception was thrown
> assertEquals(Utils.mkList("A:aa"), processorSupplier.processed);
> }
> {code}
> *Solution*
> If anybody can help in defining the solution, I can create a pull request
> for this change.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417741#comment-16417741
]
Ismael Juma commented on KAFKA-6598:
This was discussed before when "KIP-30: Allow for brokers to have plug-able
consensus and meta data storage sub systems" was proposed. It's worth reading
that discussion for the concerns with a pluggable approach.
> Kafka to support using ETCD beside Zookeeper
>
>
> Key: KAFKA-6598
> URL: https://issues.apache.org/jira/browse/KAFKA-6598
> Project: Kafka
> Issue Type: New Feature
> Components: clients, core
>Reporter: Sebastian Toader
>Priority: Major
>
> The current Kafka implementation is bound to {{Zookeeper}} to store its
> metadata for forming a cluster of nodes (producer/consumer/broker).
> As Kafka is becoming popular for streaming in various environments where
> {{Zookeeper}} is either not easy to deploy/manage or there are better
> alternatives to it there is a need
> to run Kafka with other metastore implementation than {{Zookeeper}}.
> {{etcd}} can provide the same semantics as {{Zookeeper}} for Kafka and since
> {{etcd}} is the favorable choice in certain environments (e.g. Kubernetes)
> Kafka should be able to run with {{etcd}}.
> From the user's point of view should be straightforward to configure to use
> {{etcd}} by just simply specifying a connection string that point to {{etcd}}
> cluster.
> To avoid introducing instability the original interfaces should be kept and
> only the low level {{Zookeeper}} API calls should be replaced with \{{etcd}}
> API calls in case Kafka is configured
> to use {{etcd}}.
> On the long run (which is out of scope of this jira) there should be an
> abstract layer in Kafka which then various metastore implementations would
> implement.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417697#comment-16417697
]
Boyang Chen commented on KAFKA-6723:
[~mjsax] [~guozhang] I haven't seen a concrete design for KAFKA-6657. So am I
expecting sth like:
default.max.poll.records
restore.max.poll.records
global.max.poll.records
If that one could cover cases by creating new consumer config parameters, I
guess this Jira sounds duplicate. Btw, I think the default value should be the
same.
> Separate "max.poll.record" for restore consumer and common consumer
> ---
>
> Key: KAFKA-6723
> URL: https://issues.apache.org/jira/browse/KAFKA-6723
> Project: Kafka
> Issue Type: Improvement
> Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Minor
>
> Currently, Kafka Streams use `max.poll.record` config for both restore
> consumer and normal stream consumer. In reality, they are doing different
> processing workloads, and in order to speed up the restore speed, restore
> consumer is supposed to have a higher throughput by setting `max.poll.record`
> higher. The change involved is trivial:
> [https://github.com/abbccdda/kafka/commit/cace25b74f31c8da79e93b514bcf1ed3ea9a7149]
> However, this is still a public API change (introducing a new config name),
> so we need a KIP.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alex Dunayevsky updated KAFKA-6724:
---
Reviewer: Alex Dunayevsky
> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
> Issue Type: Bug
> Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for
> it's group on every startup.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alex Dunayevsky updated KAFKA-6724:
---
Reviewer: (was: Alex Dunayevsky)
> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
> Issue Type: Bug
> Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for
> it's group on every startup.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417372#comment-16417372
]
ASF GitHub Bot commented on KAFKA-6716:
---
rajinisivaram closed pull request #4783: KAFKA-6716: Should close the
`discardChannel` in completeSend
URL: https://github.com/apache/kafka/pull/4783
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
index bd27d5c6d99..200d5111517 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java
@@ -124,11 +124,12 @@ private void completeInitiatedSends() throws IOException {
private void completeSend(Send send) throws IOException {
// Consume the send so that we will be able to send more requests to
the destination
-ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
-while (!send.completed()) {
-send.writeTo(discardChannel);
+try (ByteBufferChannel discardChannel = new
ByteBufferChannel(send.size())) {
+while (!send.completed()) {
+send.writeTo(discardChannel);
+}
+completedSends.add(send);
}
-completedSends.add(send);
}
private void completeDelayedReceives() {
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
> discardChannel should be released in MockSelector#completeSend
> --
>
> Key: KAFKA-6716
> URL: https://issues.apache.org/jira/browse/KAFKA-6716
> Project: Kafka
> Issue Type: Test
>Reporter: Ted Yu
>Assignee: huxihx
>Priority: Minor
>
> {code}
> private void completeSend(Send send) throws IOException {
> // Consume the send so that we will be able to send more requests to
> the destination
> ByteBufferChannel discardChannel = new ByteBufferChannel(send.size());
> while (!send.completed()) {
> send.writeTo(discardChannel);
> }
> completedSends.add(send);
> }
> {code}
> The {{discardChannel}} should be closed before returning from the method
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417369#comment-16417369
]
ASF GitHub Bot commented on KAFKA-6724:
---
rootex- opened a new pull request #4787: KAFKA-6724 ConsumerPerformance resets
offsets on every startup
URL: https://github.com/apache/kafka/pull/4787
Remove consumer offset reset on startup
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
### Committer Checklist (excluded from commit message)
- [ x ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
> ConsumerPerformance resets offsets on every startup
> ---
>
> Key: KAFKA-6724
> URL: https://issues.apache.org/jira/browse/KAFKA-6724
> Project: Kafka
> Issue Type: Bug
> Components: core, tools
>Affects Versions: 0.11.0.1
>Reporter: Alex Dunayevsky
>Priority: Minor
>
> ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for
> it's group on every startup.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
Alex Dunayevsky created KAFKA-6724:
--
Summary: ConsumerPerformance resets offsets on every startup
Key: KAFKA-6724
URL: https://issues.apache.org/jira/browse/KAFKA-6724
Project: Kafka
Issue Type: Bug
Components: core, tools
Affects Versions: 0.11.0.1
Reporter: Alex Dunayevsky
ConsumerPerformance used in kafka-consumer-perf-test.sh resets offsets for it's
group on every startup.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406470#comment-16406470
]
Amit Sela edited comment on KAFKA-6684 at 3/28/18 1:32 PM:
---
[~hachikuji] or [~ewencp] mind taking a look?
was (Author: amitsela):
[~hachikuji] mind taking a look?
> Support casting values with bytes schema to string
> ---
>
> Key: KAFKA-6684
> URL: https://issues.apache.org/jira/browse/KAFKA-6684
> Project: Kafka
> Issue Type: Improvement
> Components: KafkaConnect
>Reporter: Amit Sela
>Priority: Major
>
> Casting from BYTES is not supported, which means that casting LogicalTypes is
> not supported.
> This proposes to allow casting anything to a string, kind of like Java's
> {{toString()}}, such that if the object is actually a LogicalType it can be
> "serialized" as string instead of bytes+schema.
> Worst case, bytes are "casted" to whatever the {{toString()}} returns - its
> up to the user to know the data.
> This would help when using a JSON sink, or anything that's not Avro.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417173#comment-16417173
]
Deepak Goyal commented on KAFKA-6718:
-
Meanwhile, please look at the PR: [https://github.com/apache/kafka/pull/4785]
> Rack Aware Replica Task Assignment for Kafka Streams
>
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
> Issue Type: New Feature
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Assignee: Deepak Goyal
>Priority: Major
> Labels: needs-kip
>
> |Machines in data centre are sometimes grouped in racks. Racks provide
> isolation as each rack may be in a different physical location and has its
> own power source. When tasks are properly replicated across racks, it
> provides fault tolerance in that if a rack goes down, the remaining racks can
> continue to serve traffic.
>
> This feature is already implemented at Kafka
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
> but we needed similar for task assignments at Kafka Streams Application
> layer.
>
> This features enables replica tasks to be assigned on different racks for
> fault-tolerance.
> NUM_STANDBY_REPLICAS = x
> totalTasks = x+1 (replica + active)
> # If there are no rackID provided: Cluster will behave rack-unaware
> # If same rackId is given to all the nodes: Cluster will behave rack-unaware
> # If (totalTasks <= number of racks), then Cluster will be rack aware i.e.
> each replica task is each assigned to a different rack.
> # Id (totalTasks > number of racks), then it will first assign tasks on
> different racks, further tasks will be assigned to least loaded node, cluster
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which
> helps StickyPartitionAssignor to assign tasks in such a way that no two
> replica tasks are on same rack if possible.
> Post that it also helps to maintain stickyness with-in the rack.|
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417158#comment-16417158
]
Ionut-Maxim Margelatu commented on KAFKA-5882:
--
We have been able to consistently reproduce this issue. In our case, the reason
is the change of the topology between deployments:
* the existing deployment uses 2 subtopologies (A->B,B->C)
* the new deployment uses 3 subtopologies (A->B,A'->B,B->C)
Performing a deployment with an updated topology results in a failure.
Performing a deployment with the same topology works perfectly.
Here are some logs we added to StreamsThread in order to figure out what was
going on:
{noformat}
2018-03-28T09:51:32 srv="mepw" [MEP-StreamThread-2] WARN
org.apache.kafka.streams.processor.internals.StreamTask
partition=mepw_internal-47 partitionTopic=mepw_internal
topology=ProcessorTopology:
hubSource:
topics:[mepw_change_event]
children: [CDCProcessor]
CDCProcessor:
children: [CDCSink]
CDCSink:
topic:mepw_internal
2018-03-28T09:51:32 srv="mepw" [MEP-StreamThread-2] ERROR
org.apache.kafka.streams.processor.internals.StreamThread stream-thread
[MEP-StreamThread-2] Error caught during partition assignment, will abort the
current process and re-throw at the end of rebalance: null
{noformat}
During the rebalancing the Kafka Streams threads in the new app instances are
assigned to a sub-topology that doesn't necessarily have a source for the
partition they were assigned. This is what we could figure out. I hope this
helps other people.
> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073]
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO
[
https://issues.apache.org/jira/browse/KAFKA-6718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ashish Surana updated KAFKA-6718:
-
Description:
|Machines in data centre are sometimes grouped in racks. Racks provide
isolation as each rack may be in a different physical location and has its own
power source. When tasks are properly replicated across racks, it provides
fault tolerance in that if a rack goes down, the remaining racks can continue
to serve traffic.
This feature is already implemented at Kafka
[KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
but we needed similar for task assignments at Kafka Streams Application layer.
This features enables replica tasks to be assigned on different racks for
fault-tolerance.
NUM_STANDBY_REPLICAS = x
totalTasks = x+1 (replica + active)
# If there are no rackID provided: Cluster will behave rack-unaware
# If same rackId is given to all the nodes: Cluster will behave rack-unaware
# If (totalTasks <= number of racks), then Cluster will be rack aware i.e.
each replica task is each assigned to a different rack.
# Id (totalTasks > number of racks), then it will first assign tasks on
different racks, further tasks will be assigned to least loaded node, cluster
wide.|
We have added another config in StreamsConfig called "RACK_ID_CONFIG" which
helps StickyPartitionAssignor to assign tasks in such a way that no two replica
tasks are on same rack if possible.
Post that it also helps to maintain stickyness with-in the rack.|
was:
|Machines in data centre are sometimes grouped in racks. Racks provide
isolation as each rack may be in a different physical location and has its own
power source. When tasks are properly replicated across racks, it provides
fault tolerance in that if a rack goes down, the remaining racks can continue
to serve traffic.
This feature is already implemented at Kafka
[KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
but we needed similar for task assignments at Kafka Streams Application layer.
This features enables replica tasks to be assigned on different racks for
fault-tolerance.
NUM_STANDBY_REPLICAS = x
totalTasks = x+1 (replica + active)
# If there are no rackID provided: Cluster will behave rack-unaware
# If same rackId is given to all the nodes: Cluster will behave rack-unaware
# If (totalTasks >= number of racks), then Cluster will be rack aware i.e.
each replica task is each assigned to a different rack.
# Id (totalTasks < number of racks), then it will first assign tasks on
different racks, further tasks will be assigned to least loaded node, cluster
wide.|
We have added another config in StreamsConfig called "RACK_ID_CONFIG" which
helps StickyPartitionAssignor to assign tasks in such a way that no two replica
tasks are on same rack if possible.
Post that it also helps to maintain stickyness with-in the rack.|
> Rack Aware Replica Task Assignment for Kafka Streams
>
>
> Key: KAFKA-6718
> URL: https://issues.apache.org/jira/browse/KAFKA-6718
> Project: Kafka
> Issue Type: New Feature
> Components: streams
>Affects Versions: 1.1.0
>Reporter: Deepak Goyal
>Assignee: Deepak Goyal
>Priority: Major
> Labels: needs-kip
>
> |Machines in data centre are sometimes grouped in racks. Racks provide
> isolation as each rack may be in a different physical location and has its
> own power source. When tasks are properly replicated across racks, it
> provides fault tolerance in that if a rack goes down, the remaining racks can
> continue to serve traffic.
>
> This feature is already implemented at Kafka
> [KIP-36|https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment]
> but we needed similar for task assignments at Kafka Streams Application
> layer.
>
> This features enables replica tasks to be assigned on different racks for
> fault-tolerance.
> NUM_STANDBY_REPLICAS = x
> totalTasks = x+1 (replica + active)
> # If there are no rackID provided: Cluster will behave rack-unaware
> # If same rackId is given to all the nodes: Cluster will behave rack-unaware
> # If (totalTasks <= number of racks), then Cluster will be rack aware i.e.
> each replica task is each assigned to a different rack.
> # Id (totalTasks > number of racks), then it will first assign tasks on
> different racks, further tasks will be assigned to least loaded node, cluster
> wide.|
> We have added another config in StreamsConfig called "RACK_ID_CONFIG" which
> helps StickyPartitionAssignor to assign tasks in such a way that no two
> replica tasks are on same rack if possible.
> Post that it also helps to maintain stickyness with-in the rack.|
--
[
https://issues.apache.org/jira/browse/KAFKA-5882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16417038#comment-16417038
]
Ionut-Maxim Margelatu commented on KAFKA-5882:
--
We are encountering the same issue with 0.11.0.2.
> NullPointerException in StreamTask
> --
>
> Key: KAFKA-5882
> URL: https://issues.apache.org/jira/browse/KAFKA-5882
> Project: Kafka
> Issue Type: Bug
> Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> It seems bugfix [KAFKA-5073|https://issues.apache.org/jira/browse/KAFKA-5073]
> is made, but introduce some other issue.
> In some cases (I am not sure which ones) I got NPE (below).
> I would expect that even in case of FATAL error anythink except NPE is thrown.
> {code}
> 2017-09-12 23:34:54 ERROR ConsumerCoordinator:269 - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group streamer failed on partition assignment
> java.lang.NullPointerException: null
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:123)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> ~[myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> [myapp-streamer.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> [myapp-streamer.jar:?]
> 2017-09-12 23:34:54 INFO StreamThread:1040 - stream-thread
> [streamer-3a44578b-faa8-4b5b-bbeb-7a7f04639563-StreamThread-1] Shutting down
> 2017-09-12 23:34:54 INFO KafkaProducer:972 - Closing the Kafka producer with
> timeoutMillis = 9223372036854775807 ms.
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-6649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416912#comment-16416912
]
Srinivas Dhruvakumar commented on KAFKA-6649:
-
[~hachikuji] Still hitting the bug after testing with the patch.
> ReplicaFetcher stopped after non fatal exception is thrown
> --
>
> Key: KAFKA-6649
> URL: https://issues.apache.org/jira/browse/KAFKA-6649
> Project: Kafka
> Issue Type: Bug
> Components: replication
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1
>Reporter: Julio Ng
>Priority: Major
>
> We have seen several under-replication partitions, usually triggered by topic
> creation. After digging in the logs, we see the below:
> {noformat}
> [2018-03-12 22:40:17,641] ERROR [ReplicaFetcher replicaId=12, leaderId=0,
> fetcherId=1] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition
> [[TOPIC_NAME_REMOVED]]-84 offset 2098535
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:169)
> at scala.Option.foreach(Option.scala:257)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:166)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:166)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
> at
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:166)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:164)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot
> increment the log start offset to 2098535 of partition
> [[TOPIC_NAME_REMOVED]]-84 since it is larger than the high watermark -1
> [2018-03-12 22:40:17,641] INFO [ReplicaFetcher replicaId=12, leaderId=0,
> fetcherId=1] Stopped (kafka.server.ReplicaFetcherThread){noformat}
> It looks like that after the ReplicaFetcherThread is stopped, the replicas
> start to lag behind, presumably because we are not fetching from the leader
> anymore. Further examining, the ShutdownableThread.scala object:
> {noformat}
> override def run(): Unit = {
> info("Starting")
> try {
>while (isRunning)
> doWork()
> } catch {
>case e: FatalExitError =>
> shutdownInitiated.countDown()
> shutdownComplete.countDown()
> info("Stopped")
> Exit.exit(e.statusCode())
>case e: Throwable =>
> if (isRunning)
>error("Error due to", e)
> } finally {
>shutdownComplete.countDown()
> }
> info("Stopped")
> }{noformat}
> For the Throwable (non-fatal) case, it just exits the while loop and the
> thread stops doing work. I am not sure whether this is the intended behavior
> of the ShutdownableThread, or the exception should be caught and we should
> keep calling doWork()
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
[
https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16416862#comment-16416862
]
Srinivas Dhruvakumar commented on KAFKA-:
-
[~huxi_2b] -tried the latest patch KAFKA-3978 still hitting this bug
> OffsetOutOfRangeException: Replica Thread Stopped Resulting in
> Underreplicated Partitions
> -
>
> Key: KAFKA-
> URL: https://issues.apache.org/jira/browse/KAFKA-
> Project: Kafka
> Issue Type: Bug
> Components: core
>Affects Versions: 0.11.0.1
>Reporter: Srinivas Dhruvakumar
>Priority: Critical
> Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png
>
>
> Hello All,
> Currently we were seeing a few underreplicated partitions on our test cluster
> which is used for Intergation testing. On debugging more we found the replica
> thread was stopped due to an error
> Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot
> increment the log start offset to 50 of partition since it is larger
> than the high watermark -1
> Kindly find the attached screenshot.
> !Screen Shot 2018-03-15 at 3.52.13 PM.png!
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)