[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349560#comment-16349560
 ] 

ASF GitHub Bot commented on KAFKA-6487:
---

guozhangwang closed pull request #4495: KAFKA-6487: 
ChangeLoggingKeyValueBytesStore does not propagate delete
URL: https://github.com/apache/kafka/pull/4495
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 8dc457a9949..94ee275a3bf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -77,8 +77,8 @@ public void putAll(final List> 
entries) {
 
 @Override
 public byte[] delete(final Bytes key) {
-final byte[] oldValue = inner.get(key);
-put(key, null);
+final byte[] oldValue = inner.delete(key);
+changeLogger.logChange(key, null);
 return oldValue;
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
deleted file mode 100644
index ea9f7aa8713..000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueStore.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StateSerdes;
-
-import java.util.ArrayList;
-import java.util.List;
-
-class ChangeLoggingKeyValueStore extends 
WrappedStateStore.AbstractStateStore implements KeyValueStore {
-private final ChangeLoggingKeyValueBytesStore innerBytes;
-private final Serde keySerde;
-private final Serde valueSerde;
-private StateSerdes serdes;
-
-
-ChangeLoggingKeyValueStore(final KeyValueStore bytesStore,
-   final Serde keySerde,
-   final Serde valueSerde) {
-this(new ChangeLoggingKeyValueBytesStore(bytesStore), keySerde, 
valueSerde);
-}
-
-private ChangeLoggingKeyValueStore(final ChangeLoggingKeyValueBytesStore 
bytesStore,
-   final Serde keySerde,
-   final Serde valueSerde) {
-super(bytesStore);
-this.innerBytes = bytesStore;
-this.keySerde = keySerde;
-this.valueSerde = valueSerde;
-}
-
-@SuppressWarnings("unchecked")
-@Override
-public void init(final ProcessorContext context, final StateStore root) {
-innerBytes.init(context, root);
-
-serdes = new 
StateSerdes<>(ProcessorStateManager.storeChangelogTopic(context.applicationId(),
 innerBytes.name()),
-   keySerde == null ? (Serde) 
context.keySerde() : keySerde,
-   valueSerde == null ? (Serde) 
context.valueSerde() : valueSerde);
-}
-
-@Override
-public long approximateNumEntries() {
-return innerBytes.approximateNumEntries();
-}
-
-@Override
-public void put(final K key, final V value) {
-final Bytes bytesKey = 

[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347199#comment-16347199
 ] 

Matthias J. Sax commented on KAFKA-6487:


@bdevylde I added you to the list of contributors. You can assign tickets to 
yourself now.

Please, assign tickets to yourself before you start working on it. Also update 
the ticket to "work in progress" and "patch available" if possible. If you are 
interested in a ticket that is assigned but does not show any activity, feel 
free to leave a comment and ask if you can take if over. Thanks for 
contributing!

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bart De Vylder
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347165#comment-16347165
 ] 

Bill Bejeck commented on KAFKA-6487:


No worries [~bdevylde] I'll re-assign the ticket to you

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Bart De Vylder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346831#comment-16346831
 ] 

Bart De Vylder commented on KAFKA-6487:
---

[~bbejeck] i started working on this issue before I saw this ticket. I made a 
PR for it here: [https://github.com/apache/kafka/pull/4495]

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346824#comment-16346824
 ] 

ASF GitHub Bot commented on KAFKA-6487:
---

bartdevylder opened a new pull request #4495: KAFKA-6487: 
ChangeLoggingKeyValueBytesStore does not propagate delete
URL: https://github.com/apache/kafka/pull/4495
 
 
   The `ChangeLoggingKeyValueBytesStore` used to write null to its underlying 
store instead of propagating the delete, which has two drawbacks:
   - an iterator will see null values
   - unbounded memory growth of the underlying in-memory keyvalue store
   
   The fix will just propagate the delete instead of performing put(key, null). 
   
   The changes to the tests:
   - extra test whether the key is really gone after delete by calling the 
`approximateEntries` on the underlying store. This number is exact because we 
know the underlying store in the test is of type `InMemoryKeyValueStore`
   - extra test to check a delete is logged as   (the existing test 
would also succeed if the key is just absent)
   
   While also updating the corresponding tests of the 
`ChangeLoggingKeyValueStore` I noticed the class is nowhere used anymore so I 
removed it from the source code for clarity.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)