[jira] [Resolved] (KAFKA-6445) Remove deprecated metrics in 2.0
[ https://issues.apache.org/jira/browse/KAFKA-6445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-6445. Resolution: Fixed Assignee: Dong Lin (was: Charly Molter) Fix Version/s: (was: 2.1.0) 2.0.0 > Remove deprecated metrics in 2.0 > > > Key: KAFKA-6445 > URL: https://issues.apache.org/jira/browse/KAFKA-6445 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Charly Molter >Assignee: Dong Lin >Priority: Trivial > Fix For: 2.0.0 > > > As part of KIP-225 we've replaced a metric and deprecated the old one. > We should remove these metrics in 2.0.0 this Jira is to track all of the > metrics to remove in 2.0.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5950) AdminClient should retry based on returned error codes
[ https://issues.apache.org/jira/browse/KAFKA-5950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513367#comment-16513367 ] Ismael Juma commented on KAFKA-5950: [~cmccabe], this has been done right? > AdminClient should retry based on returned error codes > -- > > Key: KAFKA-5950 > URL: https://issues.apache.org/jira/browse/KAFKA-5950 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Andrey Dyachkov >Priority: Major > Labels: needs-kip > Fix For: 2.1.0 > > > The AdminClient only retries if the request fails with a retriable error. If > a response is returned, then a retry is never attempted. This is inconsistent > with other clients that check the error codes in the response and retry for > each retriable error code. > We should consider if it makes sense to adopt this behaviour in the > AdminClient so that users don't have to do it themselves. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct
[ https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513358#comment-16513358 ] ASF GitHub Bot commented on KAFKA-7021: --- guozhangwang closed pull request #5232: KAFKA-7021: checkpoint offsets from committed URL: https://github.com/apache/kafka/pull/5232 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 364fbe855d1..7f6ac7ca614 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -163,7 +163,7 @@ public String toString(final String indent) { return sb.toString(); } -protected Map recordCollectorOffsets() { +protected Map activeTaskCheckpointableOffsets() { return Collections.emptyMap(); } @@ -234,7 +234,7 @@ void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateExcep ProcessorStateException exception = null; log.trace("{} Closing state manager", logPrefix); try { -stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null); +stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null); } catch (final ProcessorStateException e) { exception = e; } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 8d46da19904..a18175ac58b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -81,7 +81,7 @@ public StateDirectory(final String applicationId, final String stateDirConfig, f * @param taskId * @return directory for the {@link TaskId} */ -File directoryForTask(final TaskId taskId) { +public File directoryForTask(final TaskId taskId) { final File taskDir = new File(stateDir, taskId.toString()); if (!taskDir.exists() && !taskDir.mkdir()) { throw new ProcessorStateException(String.format("task directory [%s] doesn't exist and couldn't be created", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 4b24aab65a3..86855f39c6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -286,7 +286,7 @@ void commit(final boolean startNewTransaction) { public void run() { flushState(); if (!eosEnabled) { -stateMgr.checkpoint(recordCollectorOffsets()); +stateMgr.checkpoint(activeTaskCheckpointableOffsets()); } commitOffsets(startNewTransaction); } @@ -297,8 +297,17 @@ public void run() { } @Override -protected Map recordCollectorOffsets() { -return recordCollector.offsets(); + +protected Map activeTaskCheckpointableOffsets() { +// put both producer acked offsets and consumer committed offsets as checkpointable offsets +final Map checkpointableOffsets = recordCollector.offsets(); +for (final Map.Entry entry : consumedOffsets.entrySet()) { +if (!checkpointableOffsets.containsKey(entry.getKey())) { +checkpointableOffsets.put(entry.getKey(), entry.getValue()); +} +} + +return checkpointableOffsets; } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java new file mode 100644 index 000..54c2bd7ede4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use
[jira] [Updated] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail
[ https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6360: --- Fix Version/s: 1.0.2 0.11.0.3 0.10.2.2 > RocksDB segments not removed when store is closed causes re-initialization to > fail > -- > > Key: KAFKA-6360 > URL: https://issues.apache.org/jira/browse/KAFKA-6360 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Damian Guy >Assignee: Damian Guy >Priority: Blocker > Fix For: 0.10.2.2, 1.1.0, 0.11.0.3, 1.0.2 > > > When a store is re-initialized it is first closed, before it is opened again. > When this happens the segments in the {{Segments}} class are closed, but they > are not removed from the list of segments. So when the store is > re-initialized the old closed segments are used. This results in: > {code} > [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] > task [1_3] Failed to flush state store > KSTREAM-AGGREGATE-STATE-STORE-24: > (org.apache.kafka.streams.processor.internals.ProcessorStateManager) > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) > at > org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) > at > org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail
[ https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6360: --- Affects Version/s: 0.10.2.1 0.11.0.2 1.0.1 > RocksDB segments not removed when store is closed causes re-initialization to > fail > -- > > Key: KAFKA-6360 > URL: https://issues.apache.org/jira/browse/KAFKA-6360 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Damian Guy >Assignee: Damian Guy >Priority: Blocker > Fix For: 1.1.0 > > > When a store is re-initialized it is first closed, before it is opened again. > When this happens the segments in the {{Segments}} class are closed, but they > are not removed from the list of segments. So when the store is > re-initialized the old closed segments are used. This results in: > {code} > [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] > task [1_3] Failed to flush state store > KSTREAM-AGGREGATE-STATE-STORE-24: > (org.apache.kafka.streams.processor.internals.ProcessorStateManager) > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) > at > org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) > at > org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent
[ https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-7055. Resolution: Fixed Fix Version/s: 2.0.0 > Kafka Streams Processor API allows you to add sinks and processors without > parent > - > > Key: KAFKA-7055 > URL: https://issues.apache.org/jira/browse/KAFKA-7055 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Nikki Thean >Assignee: Nikki Thean >Priority: Minor > Labels: easyfix > Fix For: 2.0.0 > > > The Kafka Streams Processor API allows you to define a Topology and connect > sources, processors, and sinks. From reading through the code, it seems that > you cannot forward a message to a downstream node unless it is explicitly > connected to the upstream node (from which you are forwarding the message) as > a child. Here is an example where you forward using name of downstream node > rather than child index > ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L117]). > However, I've been able to connect processors and sinks to the topology > without including parent names, i.e with empty vararg, using this method: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Topology.java#L423]. > As any attempt to forward a message to those nodes will throw a > StreamsException, I suggest throwing an exception if a processor or sink is > added without at least one upstream node. There is a method in > `InternalTopologyBuilder` that allows you to connect processors by name after > you add them to the topology, but it is not part of the external Processor > API. > In addition (or alternatively), I suggest making [the error message for when > users try to forward messages to a node that is not > connected|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L119] > more descriptive, like this one for when a user attempts to access a state > store that is not connected to the processor: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L75-L81] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7055) Kafka Streams Processor API allows you to add sinks and processors without parent
[ https://issues.apache.org/jira/browse/KAFKA-7055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513283#comment-16513283 ] ASF GitHub Bot commented on KAFKA-7055: --- mjsax closed pull request #5215: KAFKA-7055: Update InternalTopologyBuilder to throw TopologyException if a processor or sink is added with no upstream node attached URL: https://github.com/apache/kafka/pull/5215 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7356aff153f..e7dabbf649c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -580,7 +580,6 @@ public boolean test(final K1 key, final V1 value) { final String name = builder.newProcessorName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); builder.internalTopologyBuilder.addProcessor(name, new KStreamKTableJoin<>(((KTableImpl) other).valueGetterSupplier(), joiner, leftJoin), this.name); builder.internalTopologyBuilder.connectProcessorAndStateStores(name, ((KTableImpl) other).valueGetterSupplier().storeNames()); -builder.internalTopologyBuilder.connectProcessors(this.name, ((KTableImpl) other).name); return new KStreamImpl<>(builder, name, allSourceNodes, false); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 36a2edc6766..5b4b4d737b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -442,6 +442,11 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset, final String... predecessorNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(topic, "topic must not be null"); +Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); +if (predecessorNames.length == 0) { +throw new TopologyException("Sink " + name + " must have at least one parent"); +} + addSink(name, new StaticTopicNameExtractor(topic), keySerializer, valSerializer, partitioner, predecessorNames); nodeToSinkTopic.put(name, topic); } @@ -454,9 +459,13 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset, final String... predecessorNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(topicExtractor, "topic extractor must not be null"); +Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); if (nodeFactories.containsKey(name)) { throw new TopologyException("Processor " + name + " is already added."); } +if (predecessorNames.length == 0) { +throw new TopologyException("Sink " + name + " must have at least one parent"); +} for (final String predecessor : predecessorNames) { Objects.requireNonNull(predecessor, "predecessor name can't be null"); @@ -481,9 +490,13 @@ public final void addProcessor(final String name, final String... predecessorNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(supplier, "supplier must not be null"); +Objects.requireNonNull(predecessorNames, "predecessor names must not be null"); if (nodeFactories.containsKey(name)) { throw new TopologyException("Processor " + name + " is already added."); } +if (predecessorNames.length == 0) { +throw new TopologyException("Processor " + name + " must have at least one parent"); +} for (final String predecessor : predecessorNames) { Objects.requireNonNull(predecessor, "predecessor name must not be null"); @@ -592,21 +605,6 @@ public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder, storeToSourceChangelogTopic.put(storeBuilder, topic); } -public final void connectProcessors(final String... processorNames) { -if (processorNames.length < 2) { -throw new TopologyException("At least two processors need to participate in the
[jira] [Commented] (KAFKA-7060) Command-line overrides for ConnectDistributed worker properties
[ https://issues.apache.org/jira/browse/KAFKA-7060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513158#comment-16513158 ] ASF GitHub Bot commented on KAFKA-7060: --- kevin-laff opened a new pull request #5234: KAFKA-7060: Add override arguments to ConnectDistributed command line URL: https://github.com/apache/kafka/pull/5234 Allow ConnectDistributed to accept an unlimited number of --override key=value command-line arguments. This is an implementation of KIP-316. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Command-line overrides for ConnectDistributed worker properties > --- > > Key: KAFKA-7060 > URL: https://issues.apache.org/jira/browse/KAFKA-7060 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Kevin Lafferty >Priority: Major > > This Jira is for tracking the implementation for > [KIP-316|https://cwiki.apache.org/confluence/display/KAFKA/KIP-316%3A+Command-line+overrides+for+ConnectDistributed+worker+properties]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
[ https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-6975: --- Fix Version/s: 1.1.1 > AdminClient.deleteRecords() may cause replicas unable to fetch from beginning > - > > Key: KAFKA-6975 > URL: https://issues.apache.org/jira/browse/KAFKA-6975 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Anna Povzner >Priority: Blocker > Fix For: 2.0.0, 1.1.1 > > > AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to > the requested offset. If the requested offset is in the middle of the batch, > the replica will not be able to fetch from that offset (because it is in the > middle of the batch). > One use-case where this could cause problems is replica re-assignment. > Suppose we have a topic partition with 3 initial replicas, and at some point > the user issues AdminClient.deleteRecords() for the offset that falls in the > middle of the batch. It now becomes log start offset for this topic > partition. Suppose at some later time, the user starts partition > re-assignment to 3 new replicas. The new replicas (followers) will start with > HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < > log start offset (LSO); the follower will be able to reset offset to LSO of > the leader and fetch LSO; the leader will send a batch in response with base > offset stop the fetcher thread. The end result is that the new replicas will not be > able to start fetching unless LSO moves to an offset that is not in the > middle of the batch, and the re-assignment will be stuck for a possibly a > very log time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
[ https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513135#comment-16513135 ] Matthias J. Sax commented on KAFKA-5054: There was a PR that got merged IMHO. I think we can close this with fixed version 2.0. \cc [~damianguy] [~guozhang] > ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized > > > Key: KAFKA-5054 > URL: https://issues.apache.org/jira/browse/KAFKA-5054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Damian Guy >Assignee: Damian Guy >Priority: Critical > Fix For: 2.1.0 > > > {{putIfAbsent}} and {{delete}} should be synchronized as they involve at > least 2 operations on the underlying store and may result in inconsistent > results if someone were to query via IQ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7060) Command-line overrides for ConnectDistributed worker properties
Kevin Lafferty created KAFKA-7060: - Summary: Command-line overrides for ConnectDistributed worker properties Key: KAFKA-7060 URL: https://issues.apache.org/jira/browse/KAFKA-7060 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Kevin Lafferty This Jira is for tracking the implementation for [KIP-316|https://cwiki.apache.org/confluence/display/KAFKA/KIP-316%3A+Command-line+overrides+for+ConnectDistributed+worker+properties]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513125#comment-16513125 ] ASF GitHub Bot commented on KAFKA-6711: --- mjsax closed pull request #4789: KAFKA-6711: GlobalStateManagerImpl should not write offsets URL: https://github.com/apache/kafka/pull/4789 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 6052f96053c..be160bd5a33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -167,7 +167,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, for (final TopicPartition topicPartition : topicPartitions) { consumer.assign(Collections.singletonList(topicPartition)); final Long checkpoint = checkpointableOffsets.get(topicPartition); -if (checkpoint != null) { +if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT) { consumer.seek(topicPartition, checkpoint); } else { consumer.seekToBeginning(Collections.singletonList(topicPartition)); @@ -249,10 +249,33 @@ public void close(final Map offsets) throws IOException { @Override public void checkpoint(final Map offsets) { + +// Find non persistent store's topics +final Map storeToChangelogTopic = topology.storeToChangelogTopic(); +final Set globalNonPersistentStoresTopics = new HashSet<>(); +for (final StateStore store : topology.globalStateStores()) { +if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); +} +} + checkpointableOffsets.putAll(offsets); -if (!checkpointableOffsets.isEmpty()) { + +final Map filteredOffsets = new HashMap<>(); + +// Skip non persistent store +for (final Map.Entry topicPartitionOffset : checkpointableOffsets.entrySet()) { +final String topic = topicPartitionOffset.getKey().topic(); +if (globalNonPersistentStoresTopics.contains(topic)) { +filteredOffsets.put(topicPartitionOffset.getKey(), (long) StateRestorer.NO_CHECKPOINT); +} else { +filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); +} +} + +if (!filteredOffsets.isEmpty()) { try { -checkpoint.write(checkpointableOffsets); +checkpoint.write(filteredOffsets); } catch (IOException e) { log.warn("Failed to write offsets checkpoint for global stores", e); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index e9d61f5ad64..7e838763031 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -68,12 +68,13 @@ private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener(); private final TopicPartition t1 = new TopicPartition("t1", 1); private final TopicPartition t2 = new TopicPartition("t2", 1); +private final TopicPartition t3 = new TopicPartition("t3", 1); private GlobalStateManagerImpl stateManager; private NoOpProcessorContext context; private StateDirectory stateDirectory; private String stateDirPath; private NoOpReadOnlyStore store1; -private NoOpReadOnlyStore store2; +private NoOpReadOnlyStore store2, store3; private MockConsumer consumer; private File checkpointFile; private ProcessorTopology topology; @@ -83,18 +84,21 @@ public void before() throws IOException { final Map storeToTopic = new HashMap<>(); storeToTopic.put("t1-store", "t1"); storeToTopic.put("t2-store", "t2"); +storeToTopic.put("t3-store", "t3"); final Map storeToProcessorNode = new HashMap<>(); -store1 = new NoOpReadOnlyStore<>("t1-store"); +store1 = new NoOpReadOnlyStore<>("t1-store", true);
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513124#comment-16513124 ] ASF GitHub Bot commented on KAFKA-6711: --- mjsax closed pull request #4782: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-mem… URL: https://github.com/apache/kafka/pull/4782 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 56e6bed0850..17ae70ce0d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -41,6 +41,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -242,7 +243,7 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); final Long checkpoint = checkpointableOffsets.get(topicPartition); -if (checkpoint != null) { +if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT) { globalConsumer.seek(topicPartition, checkpoint); } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); @@ -334,10 +335,33 @@ public void close(final Map offsets) throws IOException { @Override public void checkpoint(final Map offsets) { + +// Find non persistent store's topics +final Map storeToChangelogTopic = topology.storeToChangelogTopic(); +final Set globalNonPersistentStoresTopics = new HashSet<>(); +for (final StateStore store : topology.globalStateStores()) { +if (!store.persistent() && storeToChangelogTopic.containsKey(store.name())) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); +} +} + checkpointableOffsets.putAll(offsets); -if (!checkpointableOffsets.isEmpty()) { + +final Map filteredOffsets = new HashMap<>(); + +// Skip non persistent store +for (final Map.Entry topicPartitionOffset : checkpointableOffsets.entrySet()) { +final String topic = topicPartitionOffset.getKey().topic(); +if (globalNonPersistentStoresTopics.contains(topic)) { +filteredOffsets.put(topicPartitionOffset.getKey(), (long) StateRestorer.NO_CHECKPOINT); +} else { +filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); +} +} + +if (!filteredOffsets.isEmpty()) { try { -checkpoint.write(checkpointableOffsets); +checkpoint.write(filteredOffsets); } catch (IOException e) { log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index df8d2010d24..39b7bb4747f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -488,6 +488,16 @@ public void shouldCheckpointRestoredOffsetsToFile() throws IOException { assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap)); } +@Test +public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException { +stateManager.initialize(); +initializeConsumer(10, 1, t3); +stateManager.register(store3, stateRestoreCallback); +stateManager.close(Collections.emptyMap()); + +assertThat(readOffsetsCheckpoint(), equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT))); +} + private Map readOffsetsCheckpoint() throws IOException { final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), ProcessorStateManager.CHECKPOINT_FILE_NAME)); diff --git
[jira] [Assigned] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char
[ https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Al-Mehdi reassigned KAFKA-5098: - Assignee: Ahmed Al-Mehdi (was: huxihx) > KafkaProducer.send() blocks and generates TimeoutException if topic name has > illegal char > - > > Key: KAFKA-5098 > URL: https://issues.apache.org/jira/browse/KAFKA-5098 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 > Environment: Java client running against server using > wurstmeister/kafka Docker image. >Reporter: Jeff Larsen >Assignee: Ahmed Al-Mehdi >Priority: Major > Fix For: 2.1.0 > > > The server is running with auto create enabled. If we try to publish to a > topic with a forward slash in the name, the call blocks and we get a > TimeoutException in the Callback. I would expect it to return immediately > with an InvalidTopicException. > There are other blocking issues that have been reported which may be related > to some degree, but this particular cause seems unrelated. > Sample code: > {code} > import org.apache.kafka.clients.producer.*; > import java.util.*; > public class KafkaProducerUnexpectedBlockingAndTimeoutException { > public static void main(String[] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "kafka.example.com:9092"); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("max.block.ms", 1); // 10 seconds should illustrate our > point > String separator = "/"; > //String separator = "_"; > try (Producer producer = new KafkaProducer<>(props)) { > System.out.println("Calling KafkaProducer.send() at " + new Date()); > producer.send( > new ProducerRecord("abc" + separator + > "someStreamName", > "Not expecting a TimeoutException here"), > new Callback() { > @Override > public void onCompletion(RecordMetadata metadata, Exception e) { > if (e != null) { > System.out.println(e.toString()); > } > } > }); > System.out.println("KafkaProducer.send() completed at " + new Date()); > } > } > } > {code} > Switching to the underscore separator in the above example works as expected. > Mea culpa: We neglected to research allowed chars in a topic name, but the > TimeoutException we encountered did not help point us in the right direction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6711: --- Fix Version/s: 1.1.1 1.0.2 0.11.0.3 0.10.2.2 > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char
[ https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513059#comment-16513059 ] Ahmed Al-Mehdi commented on KAFKA-5098: --- Since there was no response from the current Jira owner, I went ahead and created a PR for this issue. > KafkaProducer.send() blocks and generates TimeoutException if topic name has > illegal char > - > > Key: KAFKA-5098 > URL: https://issues.apache.org/jira/browse/KAFKA-5098 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 > Environment: Java client running against server using > wurstmeister/kafka Docker image. >Reporter: Jeff Larsen >Assignee: huxihx >Priority: Major > Fix For: 2.1.0 > > > The server is running with auto create enabled. If we try to publish to a > topic with a forward slash in the name, the call blocks and we get a > TimeoutException in the Callback. I would expect it to return immediately > with an InvalidTopicException. > There are other blocking issues that have been reported which may be related > to some degree, but this particular cause seems unrelated. > Sample code: > {code} > import org.apache.kafka.clients.producer.*; > import java.util.*; > public class KafkaProducerUnexpectedBlockingAndTimeoutException { > public static void main(String[] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "kafka.example.com:9092"); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("max.block.ms", 1); // 10 seconds should illustrate our > point > String separator = "/"; > //String separator = "_"; > try (Producer producer = new KafkaProducer<>(props)) { > System.out.println("Calling KafkaProducer.send() at " + new Date()); > producer.send( > new ProducerRecord("abc" + separator + > "someStreamName", > "Not expecting a TimeoutException here"), > new Callback() { > @Override > public void onCompletion(RecordMetadata metadata, Exception e) { > if (e != null) { > System.out.println(e.toString()); > } > } > }); > System.out.println("KafkaProducer.send() completed at " + new Date()); > } > } > } > {code} > Switching to the underscore separator in the above example works as expected. > Mea culpa: We neglected to research allowed chars in a topic name, but the > TimeoutException we encountered did not help point us in the right direction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1
[ https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513045#comment-16513045 ] Ted Yu commented on KAFKA-7012: --- bq. not treating this case at all and suffering the (rare?) timeout? Should be acceptable before better solution (with no performance penalty) is found. > Performance issue upgrading to kafka 1.0.1 or 1.1 > - > > Key: KAFKA-7012 > URL: https://issues.apache.org/jira/browse/KAFKA-7012 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: rajadayalan perumalsamy >Assignee: praveen >Priority: Major > Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, > Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, > Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, > Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png > > > We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. > After upgrading 1 node on the cluster, we notice that network threads use > most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. > With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% > vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is > high depending on the number of network threads used. If networks threads is > set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 > then the cpu usage is around 450%(5 vcpus). Using the same kafka > server.properties for both. > Did further analysis with git bisect, couple of build and deploys, traced the > issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine > for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit > 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have > attached screenshots of profiling done with both the commits. Screenshot > Commit-f15cdbc91b-profile shows less cpu usage by network threads and > Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show > higher cpu usage(almost entire cpu usage) by network threads. Also noticed > that kafka.network.Processor.poll() method is invoked 10 times more with > commit 47ee8e954df62b9a79099e944ec4be29afe046f6. > We need the issue to be resolved to upgrade the cluster. Please let me know > if you need any additional information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1
[ https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513037#comment-16513037 ] radai rosenblatt edited comment on KAFKA-7012 at 6/14/18 9:46 PM: -- i dont have the time to pick this up right now. IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more complicated condition for when a key (channel) gets picked into `keysWithBufferedRead`. the codition is currently {code} if (channel.hasBytesBuffered()) { //this channel has bytes enqueued in intermediary buffers that we could not read //(possibly because no memory). it may be the case that the underlying socket will //not come up in the next poll() and so we need to remember this channel for the //next poll call otherwise data may be stuck in said buffers forever. keysWithBufferedRead.add(key); } {code} which results in lots of "false positives" - keys that have something left over in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher block sizes) that cause the next poll() cycle to be inefficient. the conditions needs to check if a channel has something left *that could not be read out due to memory pressure*. alternatively - the doomsday scenario this is meant to handle is pretty rare: if a channel has a request fully inside the ssl buffers that cannot be read due to memory pressure, *and* the underlying channel will never have any more incoming bytes (so will never come back from select) the request will sit there anr rot resulting in a client timeout. the alternative to making the condition more complicated is not treating this case at all and suffering the (rare?) timeout? was (Author: radai): i dont have the time to pick this up right now. IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more complicated condition for when a key (channel) gets picked into `keysWithBufferedRead`. the codition is currently ```java if (channel.hasBytesBuffered()) { //this channel has bytes enqueued in intermediary buffers that we could not read //(possibly because no memory). it may be the case that the underlying socket will //not come up in the next poll() and so we need to remember this channel for the //next poll call otherwise data may be stuck in said buffers forever. keysWithBufferedRead.add(key); } ``` which results in lots of "false positives" - keys that have something left over in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher block sizes) that cause the next poll() cycle to be inefficient. the conditions needs to check if a channel has something left *that could not be read out due to memory pressure*. alternatively - the doomsday scenario this is meant to handle is pretty rare: if a channel has a request fully inside the ssl buffers that cannot be read due to memory pressure, *and* the underlying channel will never have any more incoming bytes (so will never come back from select) the request will sit there anr rot resulting in a client timeout. the alternative to making the condition more complicated is not treating this case at all and suffering the (rare?) timeout? > Performance issue upgrading to kafka 1.0.1 or 1.1 > - > > Key: KAFKA-7012 > URL: https://issues.apache.org/jira/browse/KAFKA-7012 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: rajadayalan perumalsamy >Assignee: praveen >Priority: Major > Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, > Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, > Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, > Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png > > > We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. > After upgrading 1 node on the cluster, we notice that network threads use > most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. > With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% > vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is > high depending on the number of network threads used. If networks threads is > set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 > then the cpu usage is around 450%(5 vcpus). Using the same kafka > server.properties for both. > Did further analysis with git bisect, couple of build and deploys, traced the > issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU
[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1
[ https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513037#comment-16513037 ] radai rosenblatt commented on KAFKA-7012: - i dont have the time to pick this up right now. IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more complicated condition for when a key (channel) gets picked into `keysWithBufferedRead`. the codition is currently ```java if (channel.hasBytesBuffered()) { //this channel has bytes enqueued in intermediary buffers that we could not read //(possibly because no memory). it may be the case that the underlying socket will //not come up in the next poll() and so we need to remember this channel for the //next poll call otherwise data may be stuck in said buffers forever. keysWithBufferedRead.add(key); } ``` which results in lots of "false positives" - keys that have something left over in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher block sizes) that cause the next poll() cycle to be inefficient. the conditions needs to check if a channel has something left *that could not be read out due to memory pressure*. alternatively - the doomsday scenario this is meant to handle is pretty rare: if a channel has a request fully inside the ssl buffers that cannot be read due to memory pressure, *and* the underlying channel will never have any more incoming bytes (so will never come back from select) the request will sit there anr rot resulting in a client timeout. the alternative to making the condition more complicated is not treating this case at all and suffering the (rare?) timeout? > Performance issue upgrading to kafka 1.0.1 or 1.1 > - > > Key: KAFKA-7012 > URL: https://issues.apache.org/jira/browse/KAFKA-7012 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: rajadayalan perumalsamy >Assignee: praveen >Priority: Major > Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, > Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, > Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, > Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png > > > We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. > After upgrading 1 node on the cluster, we notice that network threads use > most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. > With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% > vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is > high depending on the number of network threads used. If networks threads is > set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 > then the cpu usage is around 450%(5 vcpus). Using the same kafka > server.properties for both. > Did further analysis with git bisect, couple of build and deploys, traced the > issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine > for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit > 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have > attached screenshots of profiling done with both the commits. Screenshot > Commit-f15cdbc91b-profile shows less cpu usage by network threads and > Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show > higher cpu usage(almost entire cpu usage) by network threads. Also noticed > that kafka.network.Processor.poll() method is invoked 10 times more with > commit 47ee8e954df62b9a79099e944ec4be29afe046f6. > We need the issue to be resolved to upgrade the cluster. Please let me know > if you need any additional information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
[ https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513030#comment-16513030 ] ASF GitHub Bot commented on KAFKA-6975: --- hachikuji closed pull request #5229: KAFKA-6975: Fix replica fetching from non-batch-aligned log start offset URL: https://github.com/apache/kafka/pull/5229 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 68faf00c079..9339d29202f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge import kafka.api.LeaderAndIsr import kafka.api.Request +import kafka.common.UnexpectedAppendOffsetException import kafka.controller.KafkaController import kafka.log.{LogAppendInfo, LogConfig} import kafka.metrics.KafkaMetricsGroup @@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zk.AdminZkClient import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} +import org.apache.kafka.common.errors.{ReplicaNotAvailableException, NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.record.MemoryRecords @@ -187,6 +188,10 @@ class Partition(val topic: String, def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId)) + def getReplicaOrException(replicaId: Int = localBrokerId): Replica = +getReplica(replicaId).getOrElse( + throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition")) + def leaderReplicaIfLocal: Option[Replica] = leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica) @@ -549,15 +554,41 @@ class Partition(val topic: String, laggingReplicas } - def appendRecordsToFutureReplica(records: MemoryRecords) { - getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records) + private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { + if (isFuture) + getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records) + else { +// The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread +// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. +inReadLock(leaderIsrUpdateLock) { + getReplicaOrException().log.get.appendAsFollower(records) +} + } } - def appendRecordsToFollower(records: MemoryRecords) { -// The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread -// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. -inReadLock(leaderIsrUpdateLock) { - getReplica().get.log.get.appendAsFollower(records) + def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) { +try { + doAppendRecordsToFollowerOrFutureReplica(records, isFuture) +} catch { + case e: UnexpectedAppendOffsetException => +val replica = if (isFuture) getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException() +val logEndOffset = replica.logEndOffset.messageOffset +if (logEndOffset == replica.logStartOffset && +e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) { + // This may happen if the log start offset on the leader (or current replica) falls in + // the middle of the batch due to delete records request and the follower tries to + // fetch its first offset from the leader. + // We handle this case here instead of Log#append() because we will need to remove the + // segment that start with log start offset and create a new one with earlier offset + // (base offset of the batch), which will move recoveryPoint backwards, so we will need + // to checkpoint the new recovery point before we append + val replicaName = if (isFuture) "future replica" else "follower" + info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset
[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct
[ https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513013#comment-16513013 ] ASF GitHub Bot commented on KAFKA-7021: --- guozhangwang opened a new pull request #5232: KAFKA-7021: checkpoint offsets from committed URL: https://github.com/apache/kafka/pull/5232 This is a cherry-pick PR from https://github.com/apache/kafka/pull/5207 1) add the committed offsets to checkpointable offset map. 2) add the restoration integration test for the source KTable case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Source KTable checkpoint is not correct > --- > > Key: KAFKA-7021 > URL: https://issues.apache.org/jira/browse/KAFKA-7021 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > Fix For: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1 > > > Kafka Streams treats source KTables,ie, table created via `builder.table()`, > differently. Instead of creating a changelog topic, the original source topic > is use to avoid unnecessary data redundancy. > However, Kafka Streams does not write a correct local state checkpoint file. > This results in unnecessary state restore after a rebalance. Instead of the > latest committed offset, the latest restored offset is written into the > checkpoint file in `ProcessorStateManager#close()` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7010) Rename ResourceNameType.ANY to MATCH
[ https://issues.apache.org/jira/browse/KAFKA-7010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-7010. Resolution: Fixed merged the PR to trunk and 2.0 branch. > Rename ResourceNameType.ANY to MATCH > > > Key: KAFKA-7010 > URL: https://issues.apache.org/jira/browse/KAFKA-7010 > Project: Kafka > Issue Type: Sub-task > Components: core, security >Reporter: Andy Coates >Assignee: Andy Coates >Priority: Major > Fix For: 2.0.0 > > > Following on from the PR > [#5117|[https://github.com/apache/kafka/pull/5117]...] and discussions with > Colin McCabe... > The current ResourceNameType.ANY may be misleading as it performs pattern > matching for wildcard and prefixed bindings. Where as ResourceName.ANY just > brings back any resource name. > Renaming to ResourceNameType.MATCH and adding more Java doc should clear this > up. > Finally, ResourceNameType is no longer appropriate as the type is used in > ResourcePattern and ResourcePatternFilter. Hence rename to PatternType. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1
[ https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512935#comment-16512935 ] rajadayalan perumalsamy commented on KAFKA-7012: Thanks, are you planning a PR to change the behaviour of polling buffered keys? > Performance issue upgrading to kafka 1.0.1 or 1.1 > - > > Key: KAFKA-7012 > URL: https://issues.apache.org/jira/browse/KAFKA-7012 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: rajadayalan perumalsamy >Assignee: praveen >Priority: Major > Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, > Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, > Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, > Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png > > > We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. > After upgrading 1 node on the cluster, we notice that network threads use > most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. > With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% > vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is > high depending on the number of network threads used. If networks threads is > set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 > then the cpu usage is around 450%(5 vcpus). Using the same kafka > server.properties for both. > Did further analysis with git bisect, couple of build and deploys, traced the > issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine > for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit > 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have > attached screenshots of profiling done with both the commits. Screenshot > Commit-f15cdbc91b-profile shows less cpu usage by network threads and > Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show > higher cpu usage(almost entire cpu usage) by network threads. Also noticed > that kafka.network.Processor.poll() method is invoked 10 times more with > commit 47ee8e954df62b9a79099e944ec4be29afe046f6. > We need the issue to be resolved to upgrade the cluster. Please let me know > if you need any additional information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512881#comment-16512881 ] Matthias J. Sax commented on KAFKA-6583: Yes. KIP-268 is done. > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2983) Remove old Scala consumer and all related code, tests, and tools
[ https://issues.apache.org/jira/browse/KAFKA-2983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512864#comment-16512864 ] ASF GitHub Bot commented on KAFKA-2983: --- ijuma opened a new pull request #5230: KAFKA-2983: Remove Scala consumers and related code URL: https://github.com/apache/kafka/pull/5230 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove old Scala consumer and all related code, tests, and tools > > > Key: KAFKA-2983 > URL: https://issues.apache.org/jira/browse/KAFKA-2983 > Project: Kafka > Issue Type: Task >Reporter: Grant Henke >Assignee: Ismael Juma >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6982) java.lang.ArithmeticException: / by zero
[ https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512826#comment-16512826 ] wade wu commented on KAFKA-6982: You can manually add that line of code and build Kafka. -- Best Regards 吴清俊|Wade Wu > java.lang.ArithmeticException: / by zero > > > Key: KAFKA-6982 > URL: https://issues.apache.org/jira/browse/KAFKA-6982 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.1.0 > Environment: Environment: Windows 10. >Reporter: wade wu >Priority: Major > Fix For: 1.1.1 > > > Producer keeps sending messages to Kafka, Kafka is down. > Server.log shows: > .. > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > .. > > This line of code in SocketServer.scala causing the error: > {color:#33} currentProcessor = > currentProcessor{color:#d04437} % processors.size{color}{color} > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6982) java.lang.ArithmeticException: / by zero
[ https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512824#comment-16512824 ] Manikumar commented on KAFKA-6982: -- This is a minor issue. Only occurs for a brief period during server startup. Kafka 1.1.1 will be out by month end. Or You can build the binary for the sources. https://github.com/apache/kafka/tree/1.1 > java.lang.ArithmeticException: / by zero > > > Key: KAFKA-6982 > URL: https://issues.apache.org/jira/browse/KAFKA-6982 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.1.0 > Environment: Environment: Windows 10. >Reporter: wade wu >Priority: Major > Fix For: 1.1.1 > > > Producer keeps sending messages to Kafka, Kafka is down. > Server.log shows: > .. > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > .. > > This line of code in SocketServer.scala causing the error: > {color:#33} currentProcessor = > currentProcessor{color:#d04437} % processors.size{color}{color} > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-2837) FAILING TEST: kafka.api.ProducerBounceTest > testBrokerFailure
[ https://issues.apache.org/jira/browse/KAFKA-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-2837. Resolution: Fixed Fix Version/s: (was: 0.10.0.0) 2.0.0 This test has been removed. > FAILING TEST: kafka.api.ProducerBounceTest > testBrokerFailure > --- > > Key: KAFKA-2837 > URL: https://issues.apache.org/jira/browse/KAFKA-2837 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 0.9.0.0 >Reporter: Gwen Shapira >Assignee: jin xing >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > {code} > java.lang.AssertionError > at org.junit.Assert.fail(Assert.java:86) > at org.junit.Assert.assertTrue(Assert.java:41) > at org.junit.Assert.assertTrue(Assert.java:52) > at > kafka.api.ProducerBounceTest.testBrokerFailure(ProducerBounceTest.scala:117) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50) > at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) > at > org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106) > at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) > at > org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360) > at > org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54) > at > org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >
[jira] [Comment Edited] (KAFKA-6982) java.lang.ArithmeticException: / by zero
[ https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512810#comment-16512810 ] Martin M edited comment on KAFKA-6982 at 6/14/18 5:42 PM: -- Thanks [~omkreddy]. 1.1.1 is not officially available for download. How can we get the fix in 1.1.0? thanks was (Author: mar.ian): Thanks [~omkreddy]. 1.1.1 is not officially available for download. How can we get the fix in 1.1.1? thanks > java.lang.ArithmeticException: / by zero > > > Key: KAFKA-6982 > URL: https://issues.apache.org/jira/browse/KAFKA-6982 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.1.0 > Environment: Environment: Windows 10. >Reporter: wade wu >Priority: Major > Fix For: 1.1.1 > > > Producer keeps sending messages to Kafka, Kafka is down. > Server.log shows: > .. > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > .. > > This line of code in SocketServer.scala causing the error: > {color:#33} currentProcessor = > currentProcessor{color:#d04437} % processors.size{color}{color} > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7021) Source KTable checkpoint is not correct
[ https://issues.apache.org/jira/browse/KAFKA-7021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512813#comment-16512813 ] ASF GitHub Bot commented on KAFKA-7021: --- guozhangwang closed pull request #5207: KAFKA-7021: checkpoint offsets from committed URL: https://github.com/apache/kafka/pull/5207 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 0b028e67381..0c611199d88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -107,7 +107,6 @@ public InternalStreamsBuilder(final InternalTopologyBuilder internalTopologyBuil name); internalTopologyBuilder.addStateStore(storeBuilder, name); - internalTopologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topic); return kTable; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index d9c827fff52..bf6ceded143 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -164,7 +164,7 @@ public String toString(final String indent) { return sb.toString(); } -protected Map recordCollectorOffsets() { +protected Map activeTaskCheckpointableOffsets() { return Collections.emptyMap(); } @@ -239,7 +239,7 @@ void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateExcep ProcessorStateException exception = null; log.trace("Closing state manager"); try { -stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null); +stateMgr.close(writeCheckpoint ? activeTaskCheckpointableOffsets() : null); } catch (final ProcessorStateException e) { exception = e; } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 7d4b592b6a6..a9d5a93f228 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -122,7 +122,7 @@ private Map> nodeGroups = null; -interface StateStoreFactory { +public interface StateStoreFactory { Set users(); boolean loggingEnabled(); StateStore build(); @@ -1883,4 +1883,10 @@ public void updateSubscribedTopics(final Set topics, final String logPre subscriptionUpdates.updateTopics(topics); updateSubscriptions(subscriptionUpdates, logPrefix); } + +// following functions are for test only + +public synchronized Map getStateStores() { +return stateFactories; +} } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 195ea99f62d..b0761ac5507 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -46,7 +46,7 @@ private final boolean isStandby; private final ChangelogReader changelogReader; private final Map offsetLimits; -private final Map restoredOffsets; +private final Map standbyRestoredOffsets; private final Map restoreCallbacks; // used for standby tasks, keyed by state topic name private final Map storeToChangelogTopic; private final List changelogPartitions = new ArrayList<>(); @@ -79,9 +79,9 @@ public ProcessorStateManager(final TaskId taskId, partitionForTopic.put(source.topic(), source); } offsetLimits = new HashMap<>(); -restoredOffsets = new HashMap<>(); +standbyRestoredOffsets = new HashMap<>(); this.isStandby = isStandby; -restoreCallbacks = isStandby ? new HashMap<>() : null; +restoreCallbacks = isStandby ? new HashMap() : null; this.storeToChangelogTopic =
[jira] [Assigned] (KAFKA-4237) Avoid long request timeout for the consumer
[ https://issues.apache.org/jira/browse/KAFKA-4237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-4237: -- Assignee: Jason Gustafson > Avoid long request timeout for the consumer > --- > > Key: KAFKA-4237 > URL: https://issues.apache.org/jira/browse/KAFKA-4237 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > In the consumer rebalance protocol, the JoinGroup can stay in purgatory on > the server for as long as the rebalance timeout. For the Java client, that > means that the request timeout must be at least as large as the rebalance > timeout (which is governed by {{max.poll.interval.ms}} since KIP-62 and > {{session.timeout.ms}} before then). By default, since 0.10.1, this is 5 > minutes plus some change, which makes the clients slow to detect some hard > failures. > To fix this, two options come to mind: > 1. Right now, all request APIs are limited by the same request timeout in > {{NetworkClient}}, but there's not really any reason why this must be so. We > could use a separate timeout for the JoinGroup request (the implementations > of this is straightforward: > https://github.com/confluentinc/kafka/pull/108/files). > 2. Alternatively, we could prevent the server from holding the JoinGroup in > purgatory for such a long time. Instead, it could return early from the > JoinGroup (say before the session timeout has expired) with an error code > (e.g. REBALANCE_IN_PROGRESS), which tells the client that it should just > resend the JoinGroup. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6982) java.lang.ArithmeticException: / by zero
[ https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512810#comment-16512810 ] Martin M commented on KAFKA-6982: - Thanks [~omkreddy]. 1.1.1 is not officially available for download. How can we get the fix in 1.1.1? thanks > java.lang.ArithmeticException: / by zero > > > Key: KAFKA-6982 > URL: https://issues.apache.org/jira/browse/KAFKA-6982 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.1.0 > Environment: Environment: Windows 10. >Reporter: wade wu >Priority: Major > Fix For: 1.1.1 > > > Producer keeps sending messages to Kafka, Kafka is down. > Server.log shows: > .. > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > .. > > This line of code in SocketServer.scala causing the error: > {color:#33} currentProcessor = > currentProcessor{color:#d04437} % processors.size{color}{color} > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
[ https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512809#comment-16512809 ] ASF GitHub Bot commented on KAFKA-6975: --- apovzner opened a new pull request #5229: KAFKA-6975; Fix replica fetching from non-batch-aligned log start offset URL: https://github.com/apache/kafka/pull/5229 It is possible that log start offset may fall in the middle of the batch after AdminClient#deleteRecords(). This will cause a follower starting from log start offset to fail fetching (all records). Use-cases when a follower will start fetching from log start offset includes: 1) new replica due to partition re-assignment; 2) new local replica created as a result of AdminClient#AlterReplicaLogDirs(); 3) broker that was down for some time while AdminClient#deleteRecords() move log start offset beyond its HW. Added two integration tests: 1) Produce and then AdminClient#deleteRecords() while one of the followers is down, and then restart of the follower requires fetching from log start offset; 2) AdminClient#AlterReplicaLogDirs() after AdminClient#deleteRecords() ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > AdminClient.deleteRecords() may cause replicas unable to fetch from beginning > - > > Key: KAFKA-6975 > URL: https://issues.apache.org/jira/browse/KAFKA-6975 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Anna Povzner >Priority: Blocker > Fix For: 2.0.0 > > > AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to > the requested offset. If the requested offset is in the middle of the batch, > the replica will not be able to fetch from that offset (because it is in the > middle of the batch). > One use-case where this could cause problems is replica re-assignment. > Suppose we have a topic partition with 3 initial replicas, and at some point > the user issues AdminClient.deleteRecords() for the offset that falls in the > middle of the batch. It now becomes log start offset for this topic > partition. Suppose at some later time, the user starts partition > re-assignment to 3 new replicas. The new replicas (followers) will start with > HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < > log start offset (LSO); the follower will be able to reset offset to LSO of > the leader and fetch LSO; the leader will send a batch in response with base > offset stop the fetcher thread. The end result is that the new replicas will not be > able to start fetching unless LSO moves to an offset that is not in the > middle of the batch, and the re-assignment will be stuck for a possibly a > very log time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data
[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510295#comment-16510295 ] Eugen Feller edited comment on KAFKA-6977 at 6/14/18 5:38 PM: -- Quick update. I have tried with 1.1.0 client and run into issue. I believe at the end two thing helped to make the job stable: 1) Reset offset to latest. That helped with CRC checksum issues. 2) To help with continuous rebalancing I have adjusted max.poll.interval.ms and other settings to: {code:java} requestTimeout: Duration = 40 seconds, maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS), maxPollRecords: Long = 500, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 30 seconds, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 1, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L {code} was (Author: efeller): Quick update. I have tried with 1.1.0 client and run into issue. I believe at the end two thing helped to make the job stable: 1) Reset offset to latest. That helped with CRC checksum issues. 2) Adjust max.poll.interval.ms and other settings to: {code:java} requestTimeout: Duration = 40 seconds, maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS), maxPollRecords: Long = 500, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 30 seconds, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 1, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L {code} > Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 > while fetching data > - > > Key: KAFKA-6977 > URL: https://issues.apache.org/jira/browse/KAFKA-6977 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > Labels: streams > > We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and > constantly run into the following exception: > {code:java} > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > partition assignment took 40 ms. > current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, > 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, > 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31] > current standby tasks: [] > previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, > 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28] > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > State transition from PARTITIONS_ASSIGNED to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State > transition from REBALANCING to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > Encountered the following error during processing: > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - >
[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data
[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510295#comment-16510295 ] Eugen Feller edited comment on KAFKA-6977 at 6/14/18 5:37 PM: -- Quick update. I have tried with 1.1.0 client and run into issue. I believe at the end two thing helped to make the job stable: 1) Reset offset to latest. That helped with CRC checksum issues. 2) Adjust max.poll.interval.ms and other settings to: {code:java} requestTimeout: Duration = 40 seconds, maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS), maxPollRecords: Long = 500, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 30 seconds, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 1, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L {code} was (Author: efeller): Quick update. I have tried with 1.1.0 client and run into issue. I believe at the end two thing helped to make the job stable: 1) Reset offset to latest. That helped with CRC checksum issues. 2) Adjust max.poll.interval.ms and other settings to: {code:java} requestTimeout: Duration = 40 seconds, maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS), maxPollRecords: Long = 500, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 30 seconds, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L {code} > Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 > while fetching data > - > > Key: KAFKA-6977 > URL: https://issues.apache.org/jira/browse/KAFKA-6977 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > Labels: streams > > We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and > constantly run into the following exception: > {code:java} > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > partition assignment took 40 ms. > current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, > 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, > 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31] > current standby tasks: [] > previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, > 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28] > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > State transition from PARTITIONS_ASSIGNED to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State > transition from REBALANCING to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > Encountered the following error during processing: > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread >
[jira] [Updated] (KAFKA-6982) java.lang.ArithmeticException: / by zero
[ https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-6982: - Fix Version/s: (was: 2.0.0) 1.1.1 java.lang.ArithmeticException was fixed in 1.1.1 via KAFKA-6893. > java.lang.ArithmeticException: / by zero > > > Key: KAFKA-6982 > URL: https://issues.apache.org/jira/browse/KAFKA-6982 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.1.0 > Environment: Environment: Windows 10. >Reporter: wade wu >Priority: Major > Fix For: 1.1.1 > > > Producer keeps sending messages to Kafka, Kafka is down. > Server.log shows: > .. > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > .. > > This line of code in SocketServer.scala causing the error: > {color:#33} currentProcessor = > currentProcessor{color:#d04437} % processors.size{color}{color} > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data
[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510295#comment-16510295 ] Eugen Feller edited comment on KAFKA-6977 at 6/14/18 5:18 PM: -- Quick update. I have tried with 1.1.0 client and run into issue. I believe at the end two thing helped to make the job stable: 1) Reset offset to latest. That helped with CRC checksum issues. 2) Adjust max.poll.interval.ms and other settings to: {code:java} requestTimeout: Duration = 40 seconds, maxPollInterval: Duration = Duration(Integer.MAX_VALUE, TimeUnit.MILLISECONDS), maxPollRecords: Long = 500, fetchMaxBytes: Long = ConsumerConfig.DEFAULT_FETCH_MAX_BYTES, fetchMaxWait: Duration = 30 seconds, sessionTimeout: Duration = 30 seconds, heartbeatInterval: Duration = 5 seconds, stateDirCleanupDelay: Long = Long.MaxValue, numStreamThreads: Long = 1, numStandbyReplicas: Long = 0, cacheMaxBytesBuffering: Long = 128 * 1024L * 1024L {code} was (Author: efeller): Quick update. I have tried with 1.1.0 client and same issue. > Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 > while fetching data > - > > Key: KAFKA-6977 > URL: https://issues.apache.org/jira/browse/KAFKA-6977 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 >Reporter: Eugen Feller >Priority: Blocker > Labels: streams > > We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and > constantly run into the following exception: > {code:java} > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > partition assignment took 40 ms. > current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, > 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, > 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31] > current standby tasks: [] > previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, > 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28] > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > State transition from PARTITIONS_ASSIGNED to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > INFO org.apache.kafka.streams.KafkaStreams - stream-client > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State > transition from REBALANCING to RUNNING. > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] > Encountered the following error during processing: > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) > java.lang.IllegalStateException: Unexpected error code 2 while fetching data > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > Shutting down > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > State transition from RUNNING to PENDING_SHUTDOWN. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka > producer with timeoutMillis = 9223372036854775807 ms. > [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] > INFO
[jira] [Updated] (KAFKA-6977) Kafka Streams - java.lang.IllegalStateException: Unexpected error code 2 while fetching data
[ https://issues.apache.org/jira/browse/KAFKA-6977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eugen Feller updated KAFKA-6977: Description: We are running Kafka Streams 0.11.0.1 with Kafka Broker 0.10.2.1 and constantly run into the following exception: {code:java} [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] partition assignment took 40 ms. current active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28, 0_2, 0_6, 0_7, 0_8, 0_9, 0_11, 0_16, 0_18, 0_19, 0_22, 0_24, 0_26, 0_27, 0_29, 0_30, 0_31] current standby tasks: [] previous active tasks: [0_0, 0_1, 0_3, 0_4, 0_5, 0_10, 0_12, 0_13, 0_14, 0_15, 0_17, 0_20, 0_21, 0_23, 0_25, 0_28] [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING. [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e] State transition from REBALANCING to RUNNING. [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [visitstats-mongowriter-838cf286-be42-4a49-b3ab-3a291617233e-StreamThread-1] Encountered the following error during processing: at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480) at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:536) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:490) java.lang.IllegalStateException: Unexpected error code 2 while fetching data at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457) [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] Shutting down [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN. [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] Stream thread shutdown complete [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD. [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] State transition from RUNNING to ERROR. [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] WARN org.apache.kafka.streams.KafkaStreams - stream-client [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4] All stream threads have died. The Kafka Streams instance will be in an error state and should be closed. 6062195 [visitstats-mongowriter-3727734b-3c2f-4775-87d2-838d72ca3bf4-StreamThread-1] FATAL com.zenreach.data.flows.visitstatsmongoexporter.MongoVisitStatsWriter$ - Exiting main on uncaught exception java.lang.IllegalStateException: Unexpected error code 2 while fetching data at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:886) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:525) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1086) at
[jira] [Commented] (KAFKA-6982) java.lang.ArithmeticException: / by zero
[ https://issues.apache.org/jira/browse/KAFKA-6982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512758#comment-16512758 ] Martin M commented on KAFKA-6982: - How can we get a patch for 1.1.0? Thanks > java.lang.ArithmeticException: / by zero > > > Key: KAFKA-6982 > URL: https://issues.apache.org/jira/browse/KAFKA-6982 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.1.0 > Environment: Environment: Windows 10. >Reporter: wade wu >Priority: Major > Fix For: 2.0.0 > > > Producer keeps sending messages to Kafka, Kafka is down. > Server.log shows: > .. > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:33,945] WARN [Log partition=__consumer_offsets-6, > dir=D:\data\Kafka\kafka-logs] Found a corrupted index file corresponding to > log file > D:\data\Kafka\kafka-logs__consumer_offsets-6\.log due to > Corrupt index found, index file > (D:\data\Kafka\kafka-logs__consumer_offsets-6\.index) has > non-zero size but the last offset is 0 which is no greater than the base > offset 0.}, recovering segment and rebuilding index files... (kafka.log.Log) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > [2018-06-01 17:01:34,664] ERROR Error while accepting connection > (kafka.network.Acceptor) > java.lang.ArithmeticException: / by zero > at kafka.network.Acceptor.run(SocketServer.scala:354) > at java.lang.Thread.run(Thread.java:748) > .. > > This line of code in SocketServer.scala causing the error: > {color:#33} currentProcessor = > currentProcessor{color:#d04437} % processors.size{color}{color} > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4696) Streams standby task assignment should be state-store aware
[ https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512722#comment-16512722 ] Richard Yu commented on KAFKA-4696: --- Hi, I will probably restart my work on this. > Streams standby task assignment should be state-store aware > --- > > Key: KAFKA-4696 > URL: https://issues.apache.org/jira/browse/KAFKA-4696 > Project: Kafka > Issue Type: Sub-task > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Damian Guy >Assignee: Richard Yu >Priority: Major > > Task Assignment is currently not aware of which tasks have State Stores. This > can result in uneven balance of standby task assignment as all tasks are > assigned, but only those tasks with state-stores are ever created by > {{StreamThread}}. So what seems like an optimal strategy during assignment > time could be sub-optimal post-assignment. > For example, lets say we have 4 tasks (2 with state-stores), 2 clients, > numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks. > One of the clients may end up with both state-store tasks, while the other > has none. > Further to this, standby task configuration is currently "all or nothing". It > might make sense to allow more fine grained configuration, i.e., the ability > to specify the number of standby replicas individually for each stateful > operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
[ https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6975. Resolution: Fixed Fix Version/s: (was: 1.1.1) (was: 1.0.2) > AdminClient.deleteRecords() may cause replicas unable to fetch from beginning > - > > Key: KAFKA-6975 > URL: https://issues.apache.org/jira/browse/KAFKA-6975 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Anna Povzner >Priority: Blocker > Fix For: 2.0.0 > > > AdminClient.deleteRecords(beforeOffset(offset)) will set log start offset to > the requested offset. If the requested offset is in the middle of the batch, > the replica will not be able to fetch from that offset (because it is in the > middle of the batch). > One use-case where this could cause problems is replica re-assignment. > Suppose we have a topic partition with 3 initial replicas, and at some point > the user issues AdminClient.deleteRecords() for the offset that falls in the > middle of the batch. It now becomes log start offset for this topic > partition. Suppose at some later time, the user starts partition > re-assignment to 3 new replicas. The new replicas (followers) will start with > HW = 0, will try to fetch from 0, then get "out of order offset" because 0 < > log start offset (LSO); the follower will be able to reset offset to LSO of > the leader and fetch LSO; the leader will send a batch in response with base > offset stop the fetcher thread. The end result is that the new replicas will not be > able to start fetching unless LSO moves to an offset that is not in the > middle of the batch, and the re-assignment will be stuck for a possibly a > very log time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required
[ https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512623#comment-16512623 ] Rajini Sivaram commented on KAFKA-6806: --- Thanks [~rhauch] > Unable to validate sink connectors without "topics" component which is not > required > --- > > Key: KAFKA-6806 > URL: https://issues.apache.org/jira/browse/KAFKA-6806 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: CP4.1, Centos7 >Reporter: Ivan Majnarić >Priority: Major > > The bug is happening when you try to create new connector through for example > kafka-connect-ui. > While both source and sink connectors were able to be validated through REST > without "topics" as add-on with "connector.class" like this: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > }{code} > In the new version of CP4.1 you still can validate *source connectors* but > not *sink connectors*. If you want to validate sink connectors you need to > add to request -> "topics" config, like: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > "topics": "test-topic" > }{code} > So there is a little missmatch of the ways how to validate connectors which I > think happened accidentally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512618#comment-16512618 ] Gunnar Morling commented on KAFKA-7058: --- Thanks for the quick review, [~rhauch]. It's definitely not a blocker for us, we worked around it by having our own equals() implementation for schemas for now. I'm curious about the Kafka versions maintained going forward, though. Will it be 0.10.x and 2.0, but not 1.x? > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 1.1.0 >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512615#comment-16512615 ] Richard Yu commented on KAFKA-6583: --- [~mjsax] Did you finish the upgrade path for metadata? If so, we probably could start restart this PR. > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6583) Metadata should include number of state stores for task
[ https://issues.apache.org/jira/browse/KAFKA-6583?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512615#comment-16512615 ] Richard Yu edited comment on KAFKA-6583 at 6/14/18 3:31 PM: [~mjsax] Did you finish the upgrade path for metadata? If so, we probably could restart this PR. was (Author: yohan123): [~mjsax] Did you finish the upgrade path for metadata? If so, we probably could start restart this PR. > Metadata should include number of state stores for task > --- > > Key: KAFKA-6583 > URL: https://issues.apache.org/jira/browse/KAFKA-6583 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.2.0, 0.11.0.0 >Reporter: Richard Yu >Priority: Critical > Labels: needs-kip > > Currently, in the need for clients to be more evenly balanced, stateful tasks > should be distributed in such a manner that it will be spread equally. > However, for such an awareness to be implemented during task assignment, it > would require the need for the present rebalance protocol metadata to also > contain the number of state stores in a particular task. This way, it will > allow us to "weight" tasks during assignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6975) AdminClient.deleteRecords() may cause replicas unable to fetch from beginning
[ https://issues.apache.org/jira/browse/KAFKA-6975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512609#comment-16512609 ] ASF GitHub Bot commented on KAFKA-6975: --- hachikuji closed pull request #5133: KAFKA-6975: Fix fetching from non-batch-aligned log start offset URL: https://github.com/apache/kafka/pull/5133 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index b9180a45378..55f870e96f7 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import com.yammer.metrics.core.Gauge import kafka.api.LeaderAndIsr import kafka.api.Request +import kafka.common.UnexpectedAppendOffsetException import kafka.controller.KafkaController import kafka.log.{LogAppendInfo, LogConfig} import kafka.metrics.KafkaMetricsGroup @@ -30,7 +31,7 @@ import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import kafka.zk.AdminZkClient import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} +import org.apache.kafka.common.errors.{ReplicaNotAvailableException, NotEnoughReplicasException, NotLeaderForPartitionException, PolicyViolationException} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.record.MemoryRecords @@ -187,6 +188,10 @@ class Partition(val topic: String, def getReplica(replicaId: Int = localBrokerId): Option[Replica] = Option(allReplicasMap.get(replicaId)) + def getReplicaOrException(replicaId: Int = localBrokerId): Replica = +getReplica(replicaId).getOrElse( + throw new ReplicaNotAvailableException(s"Replica $replicaId is not available for partition $topicPartition")) + def leaderReplicaIfLocal: Option[Replica] = leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica) @@ -545,15 +550,41 @@ class Partition(val topic: String, laggingReplicas } - def appendRecordsToFutureReplica(records: MemoryRecords) { - getReplica(Request.FutureLocalReplicaId).get.log.get.appendAsFollower(records) + private def doAppendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean): Unit = { + if (isFuture) + getReplicaOrException(Request.FutureLocalReplicaId).log.get.appendAsFollower(records) + else { +// The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread +// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. +inReadLock(leaderIsrUpdateLock) { + getReplicaOrException().log.get.appendAsFollower(records) +} + } } - def appendRecordsToFollower(records: MemoryRecords) { -// The read lock is needed to prevent the follower replica from being updated while ReplicaAlterDirThread -// is executing maybeDeleteAndSwapFutureReplica() to replace follower replica with the future replica. -inReadLock(leaderIsrUpdateLock) { - getReplica().get.log.get.appendAsFollower(records) + def appendRecordsToFollowerOrFutureReplica(records: MemoryRecords, isFuture: Boolean) { +try { + doAppendRecordsToFollowerOrFutureReplica(records, isFuture) +} catch { + case e: UnexpectedAppendOffsetException => +val replica = if (isFuture) getReplicaOrException(Request.FutureLocalReplicaId) else getReplicaOrException() +val logEndOffset = replica.logEndOffset.messageOffset +if (logEndOffset == replica.logStartOffset && +e.firstOffset < logEndOffset && e.lastOffset >= logEndOffset) { + // This may happen if the log start offset on the leader (or current replica) falls in + // the middle of the batch due to delete records request and the follower tries to + // fetch its first offset from the leader. + // We handle this case here instead of Log#append() because we will need to remove the + // segment that start with log start offset and create a new one with earlier offset + // (base offset of the batch), which will move recoveryPoint backwards, so we will need + // to checkpoint the new recovery point before we append + val replicaName = if (isFuture) "future replica" else "follower" + info(s"Unexpected offset in append to $topicPartition. First offset ${e.firstOffset} is less than log start offset
[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512606#comment-16512606 ] Randall Hauch commented on KAFKA-7058: -- [~gunnar.morling] thanks for the PR. It looks good, but we're a few days past code freeze and even though this is a small change I'm not sure we can call this a real blocker. I'm going to approve it, but we'll probably have to wait until after the 2.0 release to merge. The good news is that this should backport pretty cleanly back to the 0.10 branch, and so it would be in the next patch release on the various branches whenever they occur. > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 1.1.0 >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-7058: - Affects Version/s: 1.0.0 1.1.0 > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0, 1.1.0 >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required
[ https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512586#comment-16512586 ] Randall Hauch edited comment on KAFKA-6806 at 6/14/18 3:10 PM: --- [~rsivaram], this is strictly speaking not a blocker, and since we're past code freeze we should push it. The fix is not that trivial, either, so it would be unnecessary risk for the release. [~iMajna], I have an idea that would make the fix quite straightforward, but it will require a very simple KIP to add a default method to the Validator that takes the whole configuration and which by default delegates to the current method. ConfigDef could then call this new method, but since this would be backward compatible no existing implementations would need to change. However, in cases like this we could implement the new method and get the behavior we want. I'll start working on the KIP, but it will have to wait until 2.1. Unfortunately we just missed the deadline by a few days to fix this the hard way. :-( was (Author: rhauch): [~rsivaram], this is strictly speaking not a blocker, and since we're past code freeze we should push it. The fix is not that trivial, either, so it would be unnecessary risk for the release. [~iMajna], I have an idea that would make the fix quite straightforward, but it will require a very simple KIP to add a default method to the Validator that takes the whole configuration and which by default delegates to the current method. ConfigDef could then call this new method, but since this would be backward compatible no existing implementations would need to change. However, in cases like this we could implement the new method and get the behavior we want. > Unable to validate sink connectors without "topics" component which is not > required > --- > > Key: KAFKA-6806 > URL: https://issues.apache.org/jira/browse/KAFKA-6806 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: CP4.1, Centos7 >Reporter: Ivan Majnarić >Priority: Major > > The bug is happening when you try to create new connector through for example > kafka-connect-ui. > While both source and sink connectors were able to be validated through REST > without "topics" as add-on with "connector.class" like this: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > }{code} > In the new version of CP4.1 you still can validate *source connectors* but > not *sink connectors*. If you want to validate sink connectors you need to > add to request -> "topics" config, like: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > "topics": "test-topic" > }{code} > So there is a little missmatch of the ways how to validate connectors which I > think happened accidentally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required
[ https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6806: - Priority: Major (was: Blocker) Fix Version/s: (was: 1.1.1) (was: 2.0.0) [~rsivaram], this is strictly speaking not a blocker, and since we're past code freeze we should push it. The fix is not that trivial, either, so it would be unnecessary risk for the release. [~iMajna], I have an idea that would make the fix quite straightforward, but it will require a very simple KIP to add a default method to the Validator that takes the whole configuration and which by default delegates to the current method. ConfigDef could then call this new method, but since this would be backward compatible no existing implementations would need to change. However, in cases like this we could implement the new method and get the behavior we want. > Unable to validate sink connectors without "topics" component which is not > required > --- > > Key: KAFKA-6806 > URL: https://issues.apache.org/jira/browse/KAFKA-6806 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: CP4.1, Centos7 >Reporter: Ivan Majnarić >Priority: Major > > The bug is happening when you try to create new connector through for example > kafka-connect-ui. > While both source and sink connectors were able to be validated through REST > without "topics" as add-on with "connector.class" like this: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > }{code} > In the new version of CP4.1 you still can validate *source connectors* but > not *sink connectors*. If you want to validate sink connectors you need to > add to request -> "topics" config, like: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > "topics": "test-topic" > }{code} > So there is a little missmatch of the ways how to validate connectors which I > think happened accidentally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7059) Offer new constructor on ProducerRecord
[ https://issues.apache.org/jira/browse/KAFKA-7059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512572#comment-16512572 ] ASF GitHub Bot commented on KAFKA-7059: --- matzew opened a new pull request #5228: KAFKA-7059 Offer new constructor on ProducerRecord URL: https://github.com/apache/kafka/pull/5228 Signed-off-by: Matthias Wessendorf Done for https://issues.apache.org/jira/browse/KAFKA-7059 When developers are creating a ProducerRecord, with custom headers, it currently requires the usage of a constructor with a slightly longer arguments list. This is OK, but it would be handy or more convenient if there was a ctor, like: ```java public ProducerRecord(String topic, K key, V value, Iterable headers) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Offer new constructor on ProducerRecord > > > Key: KAFKA-7059 > URL: https://issues.apache.org/jira/browse/KAFKA-7059 > Project: Kafka > Issue Type: Improvement >Reporter: Matthias Weßendorf >Priority: Trivial > Fix For: 2.0.1 > > > creating a ProducerRecord, with custom headers requires usage of a > constructor with a slightly longer arguments list. > > It would be handy or more convenient if there was a ctor, like: > {code} > public ProducerRecord(String topic, K key, V value, Iterable headers) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7059) Offer new constructor on ProducerRecord
Matthias Weßendorf created KAFKA-7059: - Summary: Offer new constructor on ProducerRecord Key: KAFKA-7059 URL: https://issues.apache.org/jira/browse/KAFKA-7059 Project: Kafka Issue Type: Improvement Reporter: Matthias Weßendorf Fix For: 2.0.1 creating a ProducerRecord, with custom headers requires usage of a constructor with a slightly longer arguments list. It would be handy or more convenient if there was a ctor, like: {code} public ProducerRecord(String topic, K key, V value, Iterable headers) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned
[ https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Magesh kumar Nandakumar updated KAFKA-7039: --- Priority: Blocker (was: Critical) > DelegatingClassLoader creates plugin instance even if its not Versioned > --- > > Key: KAFKA-7039 > URL: https://issues.apache.org/jira/browse/KAFKA-7039 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.0 > > > The versioned interface was introduced as part of > [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin]. > DelegatingClassLoader is now attempting to create an instance of all the > plugins, even if it's not required. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned
[ https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Magesh kumar Nandakumar updated KAFKA-7039: --- Fix Version/s: (was: 2.1.0) 2.0.0 > DelegatingClassLoader creates plugin instance even if its not Versioned > --- > > Key: KAFKA-7039 > URL: https://issues.apache.org/jira/browse/KAFKA-7039 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Blocker > Fix For: 2.0.0 > > > The versioned interface was introduced as part of > [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin]. > DelegatingClassLoader is now attempting to create an instance of all the > plugins, even if it's not required. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned
[ https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512493#comment-16512493 ] Magesh kumar Nandakumar commented on KAFKA-7039: [~rsivaram] I think this should be marked Blocker for 2.0 since without this some of the connectors could be broken depending on how the plugin path is configured. > DelegatingClassLoader creates plugin instance even if its not Versioned > --- > > Key: KAFKA-7039 > URL: https://issues.apache.org/jira/browse/KAFKA-7039 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Critical > Fix For: 2.0.0 > > > The versioned interface was introduced as part of > [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin]. > DelegatingClassLoader is now attempting to create an instance of all the > plugins, even if it's not required. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6711. --- Resolution: Fixed Reviewer: Matthias J. Sax Fix Version/s: 2.0.0 > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512359#comment-16512359 ] ASF GitHub Bot commented on KAFKA-6711: --- rajinisivaram closed pull request #5219: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file URL: https://github.com/apache/kafka/pull/5219 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 78c4a363f29..a4ec23d4c49 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -62,6 +63,7 @@ private final int retries; private final long retryBackoffMs; private final Duration pollTime; +private final Set globalNonPersistentStoresTopics = new HashSet<>(); public GlobalStateManagerImpl(final LogContext logContext, final ProcessorTopology topology, @@ -71,6 +73,14 @@ public GlobalStateManagerImpl(final LogContext logContext, final StreamsConfig config) { super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG))); +// Find non persistent store's topics +final Map storeToChangelogTopic = topology.storeToChangelogTopic(); +for (final StateStore store : topology.globalStateStores()) { +if (!store.persistent()) { + globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name())); +} +} + this.log = logContext.logger(GlobalStateManagerImpl.class); this.topology = topology; this.globalConsumer = globalConsumer; @@ -337,13 +347,22 @@ public void close(final Map offsets) throws IOException { @Override public void checkpoint(final Map offsets) { checkpointableOffsets.putAll(offsets); -if (!checkpointableOffsets.isEmpty()) { -try { -checkpoint.write(checkpointableOffsets); -} catch (final IOException e) { -log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); + +final Map filteredOffsets = new HashMap<>(); + +// Skip non persistent store +for (final Map.Entry topicPartitionOffset : checkpointableOffsets.entrySet()) { +final String topic = topicPartitionOffset.getKey().topic(); +if (!globalNonPersistentStoresTopics.contains(topic)) { +filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue()); } } + +try { +checkpoint.write(filteredOffsets); +} catch (final IOException e) { +log.warn("Failed to write offset checkpoint file to {} for global stores: {}", checkpoint, e); +} } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 900e65276ee..013e2b68110 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -22,13 +22,13 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; @@ -38,6 +38,7 @@ import org.apache.kafka.streams.state.KeyValueStore; import
[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512270#comment-16512270 ] ASF GitHub Bot commented on KAFKA-7058: --- gunnarmorling opened a new pull request #5225: KAFKA-7058 Comparing schema default values using Objects#deepEquals() URL: https://github.com/apache/kafka/pull/5225 https://issues.apache.org/jira/browse/KAFKA-7058 * Summary of testing strategy: Added new unit test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7015) Enhance RecordCollectorImpl exceptions with more context information
[ https://issues.apache.org/jira/browse/KAFKA-7015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512266#comment-16512266 ] ASF GitHub Bot commented on KAFKA-7015: --- jadireddi opened a new pull request #5224: KAFKA-7015: Fixed RecordCollectorImpl exception messages with more human readable context info. URL: https://github.com/apache/kafka/pull/5224 https://issues.apache.org/jira/browse/KAFKA-7015 Fixed `RecordCollectorImpl` class to enhance exceptions messages to human readable format for Key/Value pair. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enhance RecordCollectorImpl exceptions with more context information > - > > Key: KAFKA-7015 > URL: https://issues.apache.org/jira/browse/KAFKA-7015 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Jagadesh Adireddi >Priority: Minor > > In KIP-182 we refactored all stores to by plain {{Bytes/byte[]}} stores and > only have concrete key/value types on outer layers/wrappers of the stores. > For this reason, the most inner {{RocksDBStore}} cannot provide useful error > messages anymore if a put/get/delete operation fails as it only handles plain > bytes. > In addition, the corresponding calls to record changelog records to record > collectors will also be sending byte arrays only, and hence when there is an > error happening, the record collector can only display the key but not the > value since it is all bytes: > {code:java} > [ERROR] org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [2_2] Error sending record (key {"eventId":XXX,"version":123} > value [] timestamp YYY) to topic TTT > due to ... > {code} > The store exceptions got fixed via KAFKA-6538. > This Jira is to track the fix for RecordCollectorImpl. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-7058: -- Component/s: KafkaConnect > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > {ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default > values, but this doesn't work correctly if the default values in fact are > arrays. In this case, always {false} will be returned, also if the default > value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gunnar Morling updated KAFKA-7058: -- Description: {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' default values, but this doesn't work correctly if the default values in fact are arrays. In this case, always {{false}} will be returned, also if the default value arrays actually are the same. (was: {ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default values, but this doesn't work correctly if the default values in fact are arrays. In this case, always {false} will be returned, also if the default value arrays actually are the same.) > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
[ https://issues.apache.org/jira/browse/KAFKA-7058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512261#comment-16512261 ] Gunnar Morling commented on KAFKA-7058: --- I'll send a pull request for this in a bit. > ConnectSchema#equals() broken for array-typed default values > > > Key: KAFKA-7058 > URL: https://issues.apache.org/jira/browse/KAFKA-7058 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Gunnar Morling >Priority: Major > > {{ConnectSchema#equals()}} calls {{Objects#equals()}} for the schemas' > default values, but this doesn't work correctly if the default values in fact > are arrays. In this case, always {{false}} will be returned, also if the > default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7058) ConnectSchema#equals() broken for array-typed default values
Gunnar Morling created KAFKA-7058: - Summary: ConnectSchema#equals() broken for array-typed default values Key: KAFKA-7058 URL: https://issues.apache.org/jira/browse/KAFKA-7058 Project: Kafka Issue Type: Bug Reporter: Gunnar Morling {ConnectSchema#equals()} calls {{Objects#equals()}} for the schemas' default values, but this doesn't work correctly if the default values in fact are arrays. In this case, always {false} will be returned, also if the default value arrays actually are the same. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7057) Consumer stop polling
[ https://issues.apache.org/jira/browse/KAFKA-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Moshe Lavi updated KAFKA-7057: -- Description: We build 3 Kafka brokers (0.10.1.1) version using Spring Cloud Stream consumer to poll messages. We encountered consumer lags alerted and found some consumers were blocked and not polling anymore messages. This requires us to restart the microservice where that consumer resides. I wonder if this has to do with lack of available threads or to the fact the heartbeat daemon does not exist/work. *The thread dump shows:* kafka-coordinator-heartbeat-thread | SiteAgreementItem" #4943 daemon prio=5 os_prio=0 tid=0x7f3abdd08000 nid=0x83ac waiting for monitor entry [0x7f3a5dcdb000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409) - waiting to lock <*0x0005df800450*> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865) - locked <0x0005df800488> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) -kafka-consumer-1" #4940 prio=5 os_prio=0 tid=0x7f3a8d433800 nid=0x838e runnable [0x7f3a5dedd000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x0005df7705e0> (a sun.nio.ch.Util$2) - locked <0x0005df7705d0> (a java.util.Collections$UnmodifiableSet) - locked <0x0005df7705f0> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:470) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) - locked <*0x0005df800450*> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745) was: We build 3 Kafka brokers (0.10.1.1) version using Spring Cloud Stream consumer to poll messages. We encountered consumer lags alerted and found some consumers were blocked and not polling anymore messages. This requires us to restart the microservice where that consumer resides. I wonder if this has to do with lack of available threads or to the fact there heartbeat daemon does not exist/work. *The thread dump shows:* kafka-coordinator-heartbeat-thread | SiteAgreementItem" #4943 daemon prio=5 os_prio=0 tid=0x7f3abdd08000 nid=0x83ac waiting for monitor entry [0x7f3a5dcdb000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409) - waiting to lock <*0x0005df800450*> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865) - locked <0x0005df800488> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) -kafka-consumer-1" #4940 prio=5 os_prio=0 tid=0x7f3a8d433800 nid=0x838e runnable [0x7f3a5dedd000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
[jira] [Created] (KAFKA-7057) Consumer stop polling
Moshe Lavi created KAFKA-7057: - Summary: Consumer stop polling Key: KAFKA-7057 URL: https://issues.apache.org/jira/browse/KAFKA-7057 Project: Kafka Issue Type: Bug Components: consumer, controller Affects Versions: 0.10.1.1 Reporter: Moshe Lavi We build 3 Kafka brokers (0.10.1.1) version using Spring Cloud Stream consumer to poll messages. We encountered consumer lags alerted and found some consumers were blocked and not polling anymore messages. This requires us to restart the microservice where that consumer resides. I wonder if this has to do with lack of available threads or to the fact there heartbeat daemon does not exist/work. *The thread dump shows:* kafka-coordinator-heartbeat-thread | SiteAgreementItem" #4943 daemon prio=5 os_prio=0 tid=0x7f3abdd08000 nid=0x83ac waiting for monitor entry [0x7f3a5dcdb000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409) - waiting to lock <*0x0005df800450*> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:865) - locked <0x0005df800488> (a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) -kafka-consumer-1" #4940 prio=5 os_prio=0 tid=0x7f3a8d433800 nid=0x838e runnable [0x7f3a5dedd000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x0005df7705e0> (a sun.nio.ch.Util$2) - locked <0x0005df7705d0> (a java.util.Collections$UnmodifiableSet) - locked <0x0005df7705f0> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:470) at org.apache.kafka.common.network.Selector.poll(Selector.java:286) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) - locked <*0x0005df800450*> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4812) We are facing the same issue as SAMZA-590 for kafka
[ https://issues.apache.org/jira/browse/KAFKA-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar resolved KAFKA-4812. -- Resolution: Auto Closed Closing inactive issue. The old consumer is no longer supported. Please upgrade to the Java consumer whenever possible. > We are facing the same issue as SAMZA-590 for kafka > --- > > Key: KAFKA-4812 > URL: https://issues.apache.org/jira/browse/KAFKA-4812 > Project: Kafka > Issue Type: Bug >Reporter: Manjeer Srujan. Y >Priority: Critical > > Dead Kafka broker ignores new leader. > We are facing the same issue as samza issue below. But, we couldn't find any > fix for this in kafka. Pasted the log below for reference. > The kafka client that we are using is below. > group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.8.2.1' > https://issues.apache.org/jira/browse/SAMZA-590 > 2017-02-28 09:50:53.189 29708 [Thread-11-vendor-index-spout-executor[35 35]] > ERROR org.apache.storm.daemon.executor - - java.lang.RuntimeException: > java.nio.channels.ClosedChannelException > at > org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) > at > org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) > at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:129) > at > org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648) > at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) > at clojure.lang.AFn.run(AFn.java:22) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.nio.channels.ClosedChannelException > at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) > at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) > at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) > at > kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) > at > kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:75) > at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:65) > at > org.apache.storm.kafka.PartitionManager.(PartitionManager.java:94) > at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) > ... 6 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required
[ https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Majnarić updated KAFKA-6806: - Environment: CP4.1, Centos7 (was: CP4.1., Centos7) > Unable to validate sink connectors without "topics" component which is not > required > --- > > Key: KAFKA-6806 > URL: https://issues.apache.org/jira/browse/KAFKA-6806 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: CP4.1, Centos7 >Reporter: Ivan Majnarić >Priority: Blocker > Fix For: 2.0.0, 1.1.1 > > > The bug is happening when you try to create new connector through for example > kafka-connect-ui. > While both source and sink connectors were able to be validated through REST > without "topics" as add-on with "connector.class" like this: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > }{code} > In the new version of CP4.1 you still can validate *source connectors* but > not *sink connectors*. If you want to validate sink connectors you need to > add to request -> "topics" config, like: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > "topics": "test-topic" > }{code} > So there is a little missmatch of the ways how to validate connectors which I > think happened accidentally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required
[ https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512145#comment-16512145 ] Ivan Majnarić commented on KAFKA-6806: -- Hi [~rsivaram] and [~lindong] [~rhauch] set this as a Blocker so I suppose this should be treated like it is > Unable to validate sink connectors without "topics" component which is not > required > --- > > Key: KAFKA-6806 > URL: https://issues.apache.org/jira/browse/KAFKA-6806 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: CP4.1., Centos7 >Reporter: Ivan Majnarić >Priority: Blocker > Fix For: 2.0.0, 1.1.1 > > > The bug is happening when you try to create new connector through for example > kafka-connect-ui. > While both source and sink connectors were able to be validated through REST > without "topics" as add-on with "connector.class" like this: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > }{code} > In the new version of CP4.1 you still can validate *source connectors* but > not *sink connectors*. If you want to validate sink connectors you need to > add to request -> "topics" config, like: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > "topics": "test-topic" > }{code} > So there is a little missmatch of the ways how to validate connectors which I > think happened accidentally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7039) DelegatingClassLoader creates plugin instance even if its not Versioned
[ https://issues.apache.org/jira/browse/KAFKA-7039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-7039: -- Fix Version/s: (was: 2.0.0) 2.1.0 [~mageshn] Moving this out to 2.1.0 since it hasn't been merged yet. Feel free to include in 2.0.0 and update the merge version if completed in time, > DelegatingClassLoader creates plugin instance even if its not Versioned > --- > > Key: KAFKA-7039 > URL: https://issues.apache.org/jira/browse/KAFKA-7039 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Magesh kumar Nandakumar >Assignee: Magesh kumar Nandakumar >Priority: Critical > Fix For: 2.1.0 > > > The versioned interface was introduced as part of > [KIP-285|https://cwiki.apache.org/confluence/display/KAFKA/KIP-285%3A+Connect+Rest+Extension+Plugin]. > DelegatingClassLoader is now attempting to create an instance of all the > plugins, even if it's not required. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7028) super.users doesn't work with custom principals
[ https://issues.apache.org/jira/browse/KAFKA-7028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-7028: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since there is no PR yet. > super.users doesn't work with custom principals > --- > > Key: KAFKA-7028 > URL: https://issues.apache.org/jira/browse/KAFKA-7028 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Priority: Major > Fix For: 2.1.0 > > > SimpleAclAuthorizer creates a KafkaPrincipal for the users defined in the > super.users broker config. However, it should use the configured > KafkaPrincipalBuilder so that it works with a custom defined one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7045) Consumer may not be able to consume all messages when down-conversion is required
[ https://issues.apache.org/jira/browse/KAFKA-7045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-7045: -- Fix Version/s: (was: 2.0.0) 2.0.1 [~dhruvilshah] Moving this out to 2.0.1 since it is ready yet. If you do get a fix ready in time, please include in 2.0.0 and update the fix version > Consumer may not be able to consume all messages when down-conversion is > required > - > > Key: KAFKA-7045 > URL: https://issues.apache.org/jira/browse/KAFKA-7045 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2, 1.1.0, 2.0.0, 1.0.1 >Reporter: Dhruvil Shah >Assignee: Dhruvil Shah >Priority: Major > Fix For: 2.0.1 > > Attachments: log-cleaner-test.zip > > > When down-conversion is required, the consumer might fail consuming messages > under certain conditions. Couple such cases are outlined below: > (1) When consuming from a compacted topic, it is possible that the consumer > wants to fetch messages that fall in the middle of a batch but the messages > have been compacted by the cleaner. For example, let's say we have the > following two segments. The brackets indicate a single batch of messages and > the numbers within are the message offsets. > Segment #1: [0, 1, 2], [3, 4, 5], [6, 7, 8] > Segment #2: [9, 10, 11], [12, 13, 14] > If the cleaner were to come in now and clean up messages with offsets 7 and > 8, the segments would look like the following: > Segment #1: [0, 1, 2], [3, 4, 5], [6] > Segment #2: [9, 10, 11], [12, 13, 14] > A consumer attempting to fetch messages at offset 7 will start reading the > batch starting at offset 6. During down-conversion, we will drop the record > starting at 6 it is less than the current fetch start offset. However, there > are no messages in the log following offset 6. In such cases, we return the > `FileRecords` itself which would cause the consumer to throw an exception > because it does not understand the stored message format. > (2) When consuming from a topic with transactional messages, down-conversion > usually drops control batches because these do not exist in V0 and V1 message > formats. If there are no message batches following the control batch in the > particular segment (or if we are at the end of the log), we would again get > no records after down-conversion and will return the `FileRecords`. Because > the consumer is not able to interpret control batches, it will again throw an > exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6805) Allow dynamic broker configs to be configured in ZooKeeper before starting broker
[ https://issues.apache.org/jira/browse/KAFKA-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6805: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since it is not ready yet. > Allow dynamic broker configs to be configured in ZooKeeper before starting > broker > - > > Key: KAFKA-6805 > URL: https://issues.apache.org/jira/browse/KAFKA-6805 > Project: Kafka > Issue Type: Task > Components: tools >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.1.0 > > > At the moment, dynamic broker configs like SSL keystore and password can be > configured using ConfigCommand only after a broker is started (using the new > AdminClient). To start a broker, these configs have to be defined in > server.properties. We want to restrict updates using ZooKeeper once broker > starts up, but we should allow updates using ZK prior to starting brokers. > This is particularly useful for password configs which are stored encrypted > in ZK, making it difficult to set manually before starting brokers. > ConfigCommand is being updated to talk to AdminClient under KIP-248, but we > will need to maintain the tool using ZK to enable credentials to be created > in ZK before starting brokers. So the functionality to set broker configs can > sit alongside that. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6697) JBOD configured broker should not die if log directory is invalid
[ https://issues.apache.org/jira/browse/KAFKA-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6697: -- Fix Version/s: (was: 2.0.0) 2.1.0 [~lindong] Moving this to 2.1.0 since it is not ready yet. Feel free to include in 2.0.0 and update the fix version if completed in time. > JBOD configured broker should not die if log directory is invalid > - > > Key: KAFKA-6697 > URL: https://issues.apache.org/jira/browse/KAFKA-6697 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 2.1.0 > > > Currently JBOD configured broker will still die on startup if > dir.getCanonicalPath() throws IOException. We should mark such log directory > as offline and broker should still run if there is good disk. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6684) Support casting values with bytes schema to string
[ https://issues.apache.org/jira/browse/KAFKA-6684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6684: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since it is not ready yet. Feel free to include in 2.0.0 and change the fix version if ready to merge in time. > Support casting values with bytes schema to string > --- > > Key: KAFKA-6684 > URL: https://issues.apache.org/jira/browse/KAFKA-6684 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Amit Sela >Priority: Critical > Fix For: 2.1.0 > > > Casting from BYTES is not supported, which means that casting LogicalTypes is > not supported. > This proposes to allow casting anything to a string, kind of like Java's > {{toString()}}, such that if the object is actually a LogicalType it can be > "serialized" as string instead of bytes+schema. > > {noformat} > Examples: > BigDecimal will cast to the string representation of the number. > Timestamp will cast to the string representation of the timestamp, or maybe > UTC mmddTHH:MM:SS.f format? > {noformat} > > Worst case, bytes are "casted" to whatever the {{toString()}} returns - its > up to the user to know the data. > This would help when using a JSON sink, or anything that's not Avro. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required
[ https://issues.apache.org/jira/browse/KAFKA-6806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512110#comment-16512110 ] Rajini Sivaram commented on KAFKA-6806: --- Hi [~rhauch] [~iMajna] Same question as above for 2.0.0. Is this really a blocker? > Unable to validate sink connectors without "topics" component which is not > required > --- > > Key: KAFKA-6806 > URL: https://issues.apache.org/jira/browse/KAFKA-6806 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 > Environment: CP4.1., Centos7 >Reporter: Ivan Majnarić >Priority: Blocker > Fix For: 2.0.0, 1.1.1 > > > The bug is happening when you try to create new connector through for example > kafka-connect-ui. > While both source and sink connectors were able to be validated through REST > without "topics" as add-on with "connector.class" like this: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > }{code} > In the new version of CP4.1 you still can validate *source connectors* but > not *sink connectors*. If you want to validate sink connectors you need to > add to request -> "topics" config, like: > {code:java} > PUT / > http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate > { > "connector.class": > "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", > "topics": "test-topic" > }{code} > So there is a little missmatch of the ways how to validate connectors which I > think happened accidentally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6342) Move workaround for JSON parsing of non-escaped strings
[ https://issues.apache.org/jira/browse/KAFKA-6342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6342: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since it is not ready in time for 2.0.0 > Move workaround for JSON parsing of non-escaped strings > --- > > Key: KAFKA-6342 > URL: https://issues.apache.org/jira/browse/KAFKA-6342 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Rajini Sivaram >Assignee: Umesh Chaudhary >Priority: Major > Fix For: 2.1.0 > > > KAFKA-6319 added a workaround to parse invalid JSON persisted using older > versions of Kafka because special characters were not escaped. The workaround > is required in 1.0.1 to enable parsing invalid JSON from ACL configs in > ZooKeeper. We can move the workaround out of kafka.utils.Json#parseFull for > 1.1.0 so that it is applied only to ACLs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7051) Improve the efficiency of the ReplicaManager when there are many partitions
[ https://issues.apache.org/jira/browse/KAFKA-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-7051: -- Fix Version/s: (was: 2.0.0) 2.1.0 [~cmccabe] Moving this out to 2.1.0 since it is not ready yet. But feel free to include in 2.0.0 if reviewed on time. > Improve the efficiency of the ReplicaManager when there are many partitions > --- > > Key: KAFKA-7051 > URL: https://issues.apache.org/jira/browse/KAFKA-7051 > Project: Kafka > Issue Type: Bug > Components: replication >Affects Versions: 0.8.0 >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > Fix For: 2.1.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6897) Mirrormaker waits to shut down forever on produce failure with abort.on.send.failure=true
[ https://issues.apache.org/jira/browse/KAFKA-6897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6897: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since it is not ready for 2.0.0 > Mirrormaker waits to shut down forever on produce failure with > abort.on.send.failure=true > -- > > Key: KAFKA-6897 > URL: https://issues.apache.org/jira/browse/KAFKA-6897 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Koelli Mungee >Assignee: Dhruvil Shah >Priority: Major > Fix For: 2.1.0 > > Attachments: mirror_maker_thread_dump.log > > > Mirrormaker never shuts down after a produce failure when > abort.on.send.failure=true > {code:java} > [2018-05-07 08:29:32,417] ERROR Error when sending message to topic test with > key: 52 bytes, value: 32615 bytes with error: > (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > test-11: 45646 ms has passed since last append > [2018-05-07 08:29:32,434] INFO Closing producer due to send failure. > (kafka.tools.MirrorMaker$) > [2018-05-07 08:29:32,434] INFO [Producer clientId=producer-1] Closing the > Kafka producer with timeoutMillis = 0 ms. > (org.apache.kafka.clients.producer.KafkaProducer) > {code} > A stack trace of this mirrormaker process 9 hours later shows that the main > thread is still active and it is waiting for metadata that it will never get > since the producer send thread is no longer running. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
[ https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6648: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since it is not ready for 2.0.0. > Fetcher.getTopicMetadata() only returns "healthy" partitions, not all > - > > Key: KAFKA-6648 > URL: https://issues.apache.org/jira/browse/KAFKA-6648 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: radai rosenblatt >Assignee: radai rosenblatt >Priority: Major > Fix For: 2.1.0 > > > {code} > if (!shouldRetry) { >HashMap> topicsPartitionInfos = new > HashMap<>(); >for (String topic : cluster.topics()) > topicsPartitionInfos.put(topic, > cluster.availablePartitionsForTopic(topic)); >return topicsPartitionInfos; > } > {code} > this leads to inconsistent behavior upstream, for example in > KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions > would be returned, whereas if MD doesnt exist (or has expired) a subset of > partitions (only the healthy ones) would be returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5445) Document exceptions thrown by AdminClient methods
[ https://issues.apache.org/jira/browse/KAFKA-5445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-5445: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since it is not ready for 2.0.0 > Document exceptions thrown by AdminClient methods > - > > Key: KAFKA-5445 > URL: https://issues.apache.org/jira/browse/KAFKA-5445 > Project: Kafka > Issue Type: Improvement > Components: admin, clients >Reporter: Ismael Juma >Assignee: Andrey Dyachkov >Priority: Major > Fix For: 1.0.2, 2.1.0 > > > AdminClient should document the exceptions that users may have to handle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5098) KafkaProducer.send() blocks and generates TimeoutException if topic name has illegal char
[ https://issues.apache.org/jira/browse/KAFKA-5098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-5098: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since it is not ready for 2.0.0 > KafkaProducer.send() blocks and generates TimeoutException if topic name has > illegal char > - > > Key: KAFKA-5098 > URL: https://issues.apache.org/jira/browse/KAFKA-5098 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 0.10.2.0 > Environment: Java client running against server using > wurstmeister/kafka Docker image. >Reporter: Jeff Larsen >Assignee: huxihx >Priority: Major > Fix For: 2.1.0 > > > The server is running with auto create enabled. If we try to publish to a > topic with a forward slash in the name, the call blocks and we get a > TimeoutException in the Callback. I would expect it to return immediately > with an InvalidTopicException. > There are other blocking issues that have been reported which may be related > to some degree, but this particular cause seems unrelated. > Sample code: > {code} > import org.apache.kafka.clients.producer.*; > import java.util.*; > public class KafkaProducerUnexpectedBlockingAndTimeoutException { > public static void main(String[] args) { > Properties props = new Properties(); > props.put("bootstrap.servers", "kafka.example.com:9092"); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("max.block.ms", 1); // 10 seconds should illustrate our > point > String separator = "/"; > //String separator = "_"; > try (Producer producer = new KafkaProducer<>(props)) { > System.out.println("Calling KafkaProducer.send() at " + new Date()); > producer.send( > new ProducerRecord("abc" + separator + > "someStreamName", > "Not expecting a TimeoutException here"), > new Callback() { > @Override > public void onCompletion(RecordMetadata metadata, Exception e) { > if (e != null) { > System.out.println(e.toString()); > } > } > }); > System.out.println("KafkaProducer.send() completed at " + new Date()); > } > } > } > {code} > Switching to the underscore separator in the above example works as expected. > Mea culpa: We neglected to research allowed chars in a topic name, but the > TimeoutException we encountered did not help point us in the right direction. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6780) log cleaner shouldn't clean messages beyond high watermark
[ https://issues.apache.org/jira/browse/KAFKA-6780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6780: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since there is no PR yet. > log cleaner shouldn't clean messages beyond high watermark > -- > > Key: KAFKA-6780 > URL: https://issues.apache.org/jira/browse/KAFKA-6780 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0 >Reporter: Jun Rao >Assignee: Anna Povzner >Priority: Major > Fix For: 1.1.1, 2.1.0 > > > Currently, the firstUncleanableDirtyOffset computed by the log cleaner is > bounded by the first offset in the active segment. It's possible for the high > watermark to be smaller than that. This may cause a committed record to be > removed because of an uncommitted record. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-4950: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out since it is not ready for 2,0.0 > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Assignee: Sébastien Launay >Priority: Minor > Fix For: 1.0.2, 2.1.0 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > import java.util.Collections; > import java.util.HashMap; > import java.util.Map; > import com.codahale.metrics.Gauge; > import com.codahale.metrics.Metric; > import com.codahale.metrics.MetricSet; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.MetricName; > import static com.codahale.metrics.MetricRegistry.name; > public class KafkaMetricSet implements MetricSet { > private final KafkaConsumer client; > public KafkaMetricSet(KafkaConsumer client) { > this.client = client; > } > @Override > public Map getMetrics() { > final Map gauges = new HashMap(); > Map m = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > return Collections.unmodifiableMap(gauges); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5479) Docs for authorization omit authorizer.class.name
[ https://issues.apache.org/jira/browse/KAFKA-5479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-5479: -- Fix Version/s: (was: 2.0.0) 2.1.0 Moving this out to 2.1.0 since it is not ready for 2.0.0 > Docs for authorization omit authorizer.class.name > - > > Key: KAFKA-5479 > URL: https://issues.apache.org/jira/browse/KAFKA-5479 > Project: Kafka > Issue Type: Improvement > Components: documentation >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Minor > Labels: patch-available > Fix For: 2.1.0 > > > The documentation in §7.4 Authorization and ACLs doesn't mention the > {{authorizer.class.name}} setting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap
[ https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512051#comment-16512051 ] Don commented on KAFKA-4113: Hi Ben, you had a very interesting link in the original comment. Coincidentally we were looking into how to bootstrap KTable/GLobalKTable when you posted this. It's giving 404 now :/ Was there any technical reason for removing it? > Allow KTable bootstrap > -- > > Key: KAFKA-4113 > URL: https://issues.apache.org/jira/browse/KAFKA-4113 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Guozhang Wang >Priority: Major > > On the mailing list, there are multiple request about the possibility to > "fully populate" a KTable before actual stream processing start. > Even if it is somewhat difficult to define, when the initial populating phase > should end, there are multiple possibilities: > The main idea is, that there is a rarely updated topic that contains the > data. Only after this topic got read completely and the KTable is ready, the > application should start processing. This would indicate, that on startup, > the current partition sizes must be fetched and stored, and after KTable got > populated up to those offsets, stream processing can start. > Other discussed ideas are: > 1) an initial fixed time period for populating > (it might be hard for a user to estimate the correct value) > 2) an "idle" period, ie, if no update to a KTable for a certain time is > done, we consider it as populated > 3) a timestamp cut off point, ie, all records with an older timestamp > belong to the initial populating phase > The API change is not decided yet, and the API desing is part of this JIRA. > One suggestion (for option (4)) was: > {noformat} > KTable table = builder.table("topic", 1000); // populate the table without > reading any other topics until see one record with timestamp 1000. > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)