Repository: kafka
Updated Branches:
  refs/heads/trunk 8c7e66313 -> 6910baf54


KAFKA-4772: Exploit #peek to implement #print() and other methods

I remove `KeyValuePrinter` and `KStreamForeach` two class, then implements them 
by `KStreamPeek`.
So, now `KStreamPeek` can do `KeyValuePrinter` and `KStreamForeach` job.

Author: jameschien <jamesch...@staff.ruten.com.tw>
Author: JamesChien <jedich...@users.noreply.github.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2955 from jedichien/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6910baf5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6910baf5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6910baf5

Branch: refs/heads/trunk
Commit: 6910baf548ebad4c1530432e51be40793b4a4f10
Parents: 8c7e663
Author: James Chien <jamesch...@staff.ruten.com.tw>
Authored: Wed May 17 11:15:31 2017 -0700
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed May 17 11:15:31 2017 -0700

----------------------------------------------------------------------
 .../streams/kstream/PrintForeachAction.java     |  59 ++++++
 .../kstream/internals/KStreamForeach.java       |  43 ----
 .../streams/kstream/internals/KStreamImpl.java  |   9 +-
 .../streams/kstream/internals/KStreamPeek.java  |   9 +-
 .../streams/kstream/internals/KStreamPrint.java |  89 ++++++++
 .../streams/kstream/internals/KTableImpl.java   |   9 +-
 .../kstream/internals/KeyValuePrinter.java      | 120 -----------
 .../kstream/internals/KStreamPeekTest.java      |   9 +-
 .../kstream/internals/KStreamPrintTest.java     |  91 ++++++++
 .../internals/KeyValuePrinterProcessorTest.java | 207 -------------------
 10 files changed, 262 insertions(+), 383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
new file mode 100644
index 0000000..3eb6d80
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.io.PrintWriter;
+
+public class PrintForeachAction<K, V> implements ForeachAction<K, V> {
+
+    private final String streamName;
+    private final PrintWriter printWriter;
+    
+    /**
+     * Print data message with given writer. The PrintWriter can be null in 
order to
+     * distinguish between {@code System.out} and the others. If the 
PrintWriter is {@code PrintWriter(System.out)},
+     * then it would close {@code System.out} output stream.
+     * <p>
+     * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code 
null} instead.
+     *
+     * @param printWriter Use {@code System.out.println} if {@code null}.
+     * @param streamName The given name will be printed.
+     */
+    public PrintForeachAction(final PrintWriter printWriter, final String 
streamName) {
+        this.printWriter = printWriter;
+        this.streamName = streamName;
+    }
+
+    @Override
+    public void apply(final K key, final V value) {
+        final String data = String.format("[%s]: %s, %s", streamName, key, 
value);
+        if (printWriter == null) {
+            System.out.println(data);
+        } else {
+            printWriter.println(data);
+        }
+    }
+
+    public void close() {
+        if (printWriter == null) {
+            System.out.flush();
+        } else {
+            printWriter.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
deleted file mode 100644
index eb3189c..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ForeachAction;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-class KStreamForeach<K, V> implements ProcessorSupplier<K, V> {
-
-    private final ForeachAction<K, V> action;
-
-    public KStreamForeach(ForeachAction<K, V> action) {
-        this.action = action;
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KStreamForeachProcessor();
-    }
-
-    private class KStreamForeachProcessor extends AbstractProcessor<K, V> {
-        @Override
-        public void process(K key, V value) {
-            action.apply(key, value);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b751294..38b0a85 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -193,7 +194,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
     public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) 
{
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
-        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, 
streamName), this.name);
+        topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(null, streamName), keySerde, valSerde), this.name);
     }
 
 
@@ -227,7 +228,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
         try {
             PrintWriter printWriter = null;
             printWriter = new PrintWriter(filePath, 
StandardCharsets.UTF_8.name());
-            topology.addProcessor(name, new KeyValuePrinter<>(printWriter, 
keySerde, valSerde, streamName), this.name);
+            topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(printWriter, streamName), keySerde, valSerde), this.name);
         } catch (FileNotFoundException | UnsupportedEncodingException e) {
             String message = "Unable to write stream to file at [" + filePath 
+ "] " + e.getMessage();
             throw new TopologyBuilderException(message);
@@ -315,7 +316,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
         Objects.requireNonNull(action, "action can't be null");
         String name = topology.newName(FOREACH_NAME);
 
-        topology.addProcessor(name, new KStreamForeach<>(action), this.name);
+        topology.addProcessor(name, new KStreamPeek<>(action, false), 
this.name);
     }
 
     @Override
@@ -323,7 +324,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
         Objects.requireNonNull(action, "action can't be null");
         final String name = topology.newName(PEEK_NAME);
 
-        topology.addProcessor(name, new KStreamPeek<>(action), this.name);
+        topology.addProcessor(name, new KStreamPeek<>(action, true), 
this.name);
 
         return new KStreamImpl<>(topology, name, sourceNodes, 
repartitionRequired);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
index 2e72f68..44d1d60 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java
@@ -23,10 +23,12 @@ import org.apache.kafka.streams.processor.ProcessorSupplier;
 
 class KStreamPeek<K, V> implements ProcessorSupplier<K, V> {
 
+    private final boolean forwardDownStream;
     private final ForeachAction<K, V> action;
 
-    public KStreamPeek(final ForeachAction<K, V> action) {
+    public KStreamPeek(final ForeachAction<K, V> action, final boolean 
forwardDownStream) {
         this.action = action;
+        this.forwardDownStream = forwardDownStream;
     }
 
     @Override
@@ -38,7 +40,10 @@ class KStreamPeek<K, V> implements ProcessorSupplier<K, V> {
         @Override
         public void process(final K key, final V value) {
             action.apply(key, value);
-            context().forward(key, value);
+            if (forwardDownStream) {
+                context().forward(key, value);
+            }
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
new file mode 100644
index 0000000..8447ae1
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.PrintForeachAction;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+
+public class KStreamPrint<K, V> implements ProcessorSupplier<K, V> {
+
+    private final Serde<?> keySerde;
+    private final Serde<?> valueSerde;
+    private final ForeachAction<K, V> action;
+    
+    public KStreamPrint(final ForeachAction<K, V> action, final Serde<?> 
keySerde, final Serde<?> valueSerde) {
+        this.action = action;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+    }
+
+    @Override
+    public Processor<K, V> get() {
+        return new KStreamPrintProcessor(keySerde, valueSerde);
+    }
+
+    private class KStreamPrintProcessor extends AbstractProcessor<K, V> {
+        
+        private Serde<?> keySerde;
+        private Serde<?> valueSerde;
+        private ProcessorContext context;
+        
+        public KStreamPrintProcessor(final Serde<?> keySerde, final Serde<?> 
valueSerde) {
+            this.keySerde = keySerde;
+            this.valueSerde = valueSerde;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            this.context = context;
+            if (keySerde == null) {
+                this.keySerde = context.keySerde();
+            }
+            if (valueSerde == null) {
+                this.valueSerde = context.valueSerde();
+            }
+        }
+
+        @Override
+        public void process(final K key, final V value) {
+            final K deKey = (K) maybeDeserialize(key, keySerde.deserializer());
+            final V deValue = (V) maybeDeserialize(value, 
valueSerde.deserializer());
+            action.apply(deKey, deValue);
+        }
+
+        private Object maybeDeserialize(final Object keyOrValue, final 
Deserializer<?> deserializer) {
+            if (keyOrValue instanceof byte[]) {
+                return deserializer.deserialize(this.context.topic(), (byte[]) 
keyOrValue);
+            }
+            return keyOrValue;
+        }
+        
+        @Override
+        public void close() {
+            if (action instanceof PrintForeachAction) {
+                ((PrintForeachAction) action).close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 96a0b2c..2103ff2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.PrintForeachAction;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -242,7 +243,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
     public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) 
{
         String name = topology.newName(PRINTING_NAME);
         streamName = (streamName == null) ? this.name : streamName;
-        topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, 
streamName), this.name);
+        topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(null, streamName), keySerde, valSerde), this.name);
     }
 
     @Override
@@ -273,7 +274,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
         streamName = (streamName == null) ? this.name : streamName;
         try {
             PrintWriter printWriter = new PrintWriter(filePath, 
StandardCharsets.UTF_8.name());
-            topology.addProcessor(name, new KeyValuePrinter<>(printWriter, 
keySerde, valSerde, streamName), this.name);
+            topology.addProcessor(name, new KStreamPrint<>(new 
PrintForeachAction(printWriter, streamName), keySerde, valSerde), this.name);
         } catch (FileNotFoundException | UnsupportedEncodingException e) {
             String message = "Unable to write stream to file at [" + filePath 
+ "] " + e.getMessage();
             throw new TopologyBuilderException(message);
@@ -284,12 +285,12 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K> implements KTable<K,
     public void foreach(final ForeachAction<? super K, ? super V> action) {
         Objects.requireNonNull(action, "action can't be null");
         String name = topology.newName(FOREACH_NAME);
-        KStreamForeach<K, Change<V>> processorSupplier = new 
KStreamForeach<>(new ForeachAction<K, Change<V>>() {
+        KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new 
ForeachAction<K, Change<V>>() {
             @Override
             public void apply(K key, Change<V> value) {
                 action.apply(key, value.newValue);
             }
-        });
+        }, false);
         topology.addProcessor(name, processorSupplier, this.name);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
deleted file mode 100644
index c1d4382..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.processor.AbstractProcessor;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-import java.io.PrintWriter;
-
-
-class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> {
-    private final PrintWriter printWriter;
-    private Serde<?> keySerde;
-    private Serde<?> valueSerde;
-    private String streamName;
-
-    KeyValuePrinter(PrintWriter printWriter, Serde<?> keySerde, Serde<?> 
valueSerde, String streamName) {
-        this.keySerde = keySerde;
-        this.valueSerde = valueSerde;
-        this.streamName = streamName;
-        this.printWriter = printWriter;
-    }
-
-    KeyValuePrinter(PrintWriter printWriter, String streamName) {
-        this(printWriter, null, null, streamName);
-    }
-
-    KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde, String streamName) 
{
-        this(null, keySerde, valueSerde, streamName);
-    }
-
-    @Override
-    public Processor<K, V> get() {
-        return new KeyValuePrinterProcessor(this.printWriter, this.keySerde, 
this.valueSerde, this.streamName);
-    }
-
-    private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> {
-        private final PrintWriter printWriter;
-        private Serde<?> keySerde;
-        private Serde<?> valueSerde;
-        private ProcessorContext processorContext;
-        private String streamName;
-
-        private KeyValuePrinterProcessor(PrintWriter printWriter, Serde<?> 
keySerde, Serde<?> valueSerde, String streamName) {
-            this.printWriter = printWriter;
-            this.keySerde = keySerde;
-            this.valueSerde = valueSerde;
-            this.streamName = streamName;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            this.processorContext = context;
-
-            if (this.keySerde == null) {
-                keySerde = this.processorContext.keySerde();
-            }
-
-            if (this.valueSerde == null) {
-                valueSerde = this.processorContext.valueSerde();
-            }
-        }
-
-        @Override
-        public void process(K key, V value) {
-            K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer());
-            V valueToPrint = (V) maybeDeserialize(value, 
valueSerde.deserializer());
-
-            println("[" + this.streamName + "]: " + keyToPrint + " , " + 
valueToPrint);
-
-            this.processorContext.forward(key, value);
-        }
-
-        private void println(String str) {
-            if (printWriter == null)
-                System.out.println(str);
-            else
-                printWriter.println(str);
-        }
-
-        private Object maybeDeserialize(Object receivedElement, 
Deserializer<?> deserializer) {
-            if (receivedElement == null) {
-                return null;
-            }
-
-            if (receivedElement instanceof byte[]) {
-                return deserializer.deserialize(this.processorContext.topic(), 
(byte[]) receivedElement);
-            }
-
-            return receivedElement;
-        }
-
-        @Override
-        public void close() {
-            if (printWriter == null) {
-                System.out.flush();
-            } else {
-                printWriter.close();
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 7991a92..df6f765 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.test.KStreamTestDriver;
+
 import org.junit.After;
 import org.junit.Test;
 
@@ -34,7 +36,8 @@ import static org.junit.Assert.fail;
 public class KStreamPeekTest {
 
     private final String topicName = "topic";
-
+    private final Serde<Integer> intSerd = Serdes.Integer();
+    private final Serde<String> stringSerd = Serdes.String();
     private KStreamTestDriver driver = null;
 
     @After
@@ -47,7 +50,7 @@ public class KStreamPeekTest {
     @Test
     public void shouldObserveStreamElements() {
         final KStreamBuilder builder = new KStreamBuilder();
-        final KStream<Integer, String> stream = 
builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        final KStream<Integer, String> stream = builder.stream(intSerd, 
stringSerd, topicName);
         final List<KeyValue<Integer, String>> peekObserved = new 
ArrayList<>(), streamObserved = new ArrayList<>();
         stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
 
@@ -66,7 +69,7 @@ public class KStreamPeekTest {
     @Test
     public void shouldNotAllowNullAction() {
         final KStreamBuilder builder = new KStreamBuilder();
-        final KStream<Integer, String> stream = 
builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        final KStream<Integer, String> stream = builder.stream(intSerd, 
stringSerd, topicName);
         try {
             stream.peek(null);
             fail("expected null action to throw NPE");

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
new file mode 100644
index 0000000..c537e0a
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.PrintForeachAction;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.KStreamTestDriver;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamPrintTest {
+
+    private final String topicName = "topic";
+    private final Serde<Integer> intSerd = Serdes.Integer();
+    private final Serde<String> stringSerd = Serdes.String();
+    private PrintWriter printWriter;
+    private ByteArrayOutputStream byteOutStream;
+    private KStreamTestDriver driver = null;
+
+
+    @Before
+    public void setUp() {
+        byteOutStream = new ByteArrayOutputStream();
+        printWriter = new PrintWriter(new OutputStreamWriter(byteOutStream, 
StandardCharsets.UTF_8));
+    }
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+        }
+    }
+    
+    @Test
+    public void testPrintKeyValueWithName() {
+        final KStreamPrint<Integer, String> kStreamPrint = new 
KStreamPrint<>(new PrintForeachAction(printWriter, "test-stream"), intSerd, 
stringSerd);
+
+        final List<KeyValue<Integer, String>> inputRecords = Arrays.asList(
+                new KeyValue<>(0, "zero"),
+                new KeyValue<>(1, "one"),
+                new KeyValue<>(2, "two"),
+                new KeyValue<>(3, "three"));
+        
+        final String[] expectedResult = {"[test-stream]: 0, zero", 
"[test-stream]: 1, one", "[test-stream]: 2, two", "[test-stream]: 3, three"};
+        
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<Integer, String> stream = builder.stream(intSerd, 
stringSerd, topicName);
+        stream.process(kStreamPrint);
+        
+        driver = new KStreamTestDriver(builder);
+        for (KeyValue<Integer, String> record: inputRecords) {
+            driver.process(topicName, record.key, record.value);
+        }
+        printWriter.flush();
+        final String[] flushOutDatas = new String(byteOutStream.toByteArray(), 
Charset.forName("UTF-8")).split("\n");
+        for (int i = 0; i < flushOutDatas.length; i++) {
+            assertEquals(flushOutDatas[i], expectedResult[i]);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
deleted file mode 100644
index 7de2b8b..0000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-
-public class KeyValuePrinterProcessorTest {
-
-    private final String topicName = "topic";
-    private final Serde<String> stringSerde = Serdes.String();
-    private ByteArrayOutputStream baos;
-    private KStreamBuilder builder;
-    private PrintWriter printWriter;
-    private KStreamTestDriver driver;
-
-    @Before
-    public void setup() {
-        baos = new ByteArrayOutputStream();
-        builder = new KStreamBuilder();
-        printWriter = new PrintWriter(new OutputStreamWriter(baos, 
StandardCharsets.UTF_8));
-        driver = null;
-    }
-
-    @After
-    public void cleanup() {
-        if (driver != null) {
-            driver.close();
-        }
-        driver = null;
-    }
-
-    @Test
-    public void testPrintKeyValueDefaultSerde() throws Exception {
-
-        KeyValuePrinter<String, String> keyValuePrinter = new 
KeyValuePrinter<>(printWriter, null);
-        String[] suppliedKeys = {"foo", "bar", null};
-        String[] suppliedValues = {"value1", "value2", "value3"};
-        String[] expectedValues = {"[null]: foo , value1", "[null]: bar , 
value2", "[null]: null , value3"};
-
-
-        KStream<String, String> stream = builder.stream(stringSerde, 
stringSerde, topicName);
-        stream.process(keyValuePrinter);
-
-        driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < suppliedKeys.length; i++) {
-            driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
-        }
-        printWriter.flush();
-        String[] capturedValues = new String(baos.toByteArray(), 
Charset.forName("UTF-8")).split("\n");
-
-        for (int i = 0; i < capturedValues.length; i++) {
-            assertEquals(capturedValues[i], expectedValues[i]);
-        }
-    }
-
-    @Test
-    public void testPrintKeyValuesWithName() throws Exception {
-
-        KeyValuePrinter<String, String> keyValuePrinter = new 
KeyValuePrinter<>(printWriter, "test-stream");
-        String[] suppliedKeys = {"foo", "bar", null};
-        String[] suppliedValues = {"value1", "value2", "value3"};
-        String[] expectedValues = {"[test-stream]: foo , value1", 
"[test-stream]: bar , value2", "[test-stream]: null , value3"};
-
-
-        KStream<String, String> stream = builder.stream(stringSerde, 
stringSerde, topicName);
-        stream.process(keyValuePrinter);
-
-        driver = new KStreamTestDriver(builder);
-        for (int i = 0; i < suppliedKeys.length; i++) {
-            driver.process(topicName, suppliedKeys[i], suppliedValues[i]);
-        }
-        printWriter.flush();
-        String[] capturedValues = new String(baos.toByteArray(), 
Charset.forName("UTF-8")).split("\n");
-
-        for (int i = 0; i < capturedValues.length; i++) {
-            assertEquals(capturedValues[i], expectedValues[i]);
-        }
-    }
-
-
-    @Test
-    public void testPrintKeyValueWithProvidedSerde() throws Exception {
-
-        Serde<MockObject> mockObjectSerde = Serdes.serdeFrom(new 
MockSerializer(), new MockDeserializer());
-        KeyValuePrinter<String, MockObject> keyValuePrinter = new 
KeyValuePrinter<>(printWriter, stringSerde, mockObjectSerde, null);
-        KStream<String, MockObject> stream = builder.stream(stringSerde, 
mockObjectSerde, topicName);
-
-        stream.process(keyValuePrinter);
-
-        driver = new KStreamTestDriver(builder);
-
-        String suppliedKey = null;
-        byte[] suppliedValue = "{\"name\":\"print\", 
\"label\":\"test\"}".getBytes(Charset.forName("UTF-8"));
-
-        driver.process(topicName, suppliedKey, suppliedValue);
-        printWriter.flush();
-        String capturedValue = new String(baos.toByteArray(), 
Charset.forName("UTF-8")).trim();
-
-        assertEquals("[null]: null , name:print label:test", capturedValue);
-
-    }
-
-    private static class MockObject {
-        public String name;
-        public String label;
-
-        public MockObject() {
-        }
-
-        MockObject(String name, String label) {
-            this.name = name;
-            this.label = label;
-        }
-
-        @Override
-        public String toString() {
-            return "name:" + name + " label:" + label;
-        }
-    }
-
-
-    private static class MockDeserializer implements Deserializer<MockObject> {
-
-        private com.fasterxml.jackson.databind.ObjectMapper objectMapper = new 
com.fasterxml.jackson.databind.ObjectMapper();
-
-        @Override
-        public void configure(Map<String, ?> configs, boolean isKey) {
-
-        }
-
-        @Override
-        public MockObject deserialize(String topic, byte[] data) {
-            MockObject mockObject;
-            try {
-                mockObject = objectMapper.readValue(data, MockObject.class);
-            } catch (Exception e) {
-                throw new SerializationException(e);
-            }
-            return mockObject;
-        }
-
-        @Override
-        public void close() {
-
-        }
-    }
-
-
-    private static class MockSerializer implements Serializer<MockObject> {
-        private final com.fasterxml.jackson.databind.ObjectMapper objectMapper 
= new com.fasterxml.jackson.databind.ObjectMapper();
-
-        @Override
-        public void configure(Map<String, ?> configs, boolean isKey) {
-
-        }
-
-        @Override
-        public byte[] serialize(String topic, MockObject data) {
-            try {
-                return objectMapper.writeValueAsBytes(data);
-            } catch (Exception e) {
-                throw new SerializationException("Error serializing JSON 
message", e);
-            }
-        }
-
-        @Override
-        public void close() {
-
-        }
-    }
-
-
-}
\ No newline at end of file

Reply via email to