Repository: kafka Updated Branches: refs/heads/trunk 8c7e66313 -> 6910baf54
KAFKA-4772: Exploit #peek to implement #print() and other methods I remove `KeyValuePrinter` and `KStreamForeach` two class, then implements them by `KStreamPeek`. So, now `KStreamPeek` can do `KeyValuePrinter` and `KStreamForeach` job. Author: jameschien <jamesch...@staff.ruten.com.tw> Author: JamesChien <jedich...@users.noreply.github.com> Reviewers: Matthias J. Sax, Guozhang Wang Closes #2955 from jedichien/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6910baf5 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6910baf5 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6910baf5 Branch: refs/heads/trunk Commit: 6910baf548ebad4c1530432e51be40793b4a4f10 Parents: 8c7e663 Author: James Chien <jamesch...@staff.ruten.com.tw> Authored: Wed May 17 11:15:31 2017 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed May 17 11:15:31 2017 -0700 ---------------------------------------------------------------------- .../streams/kstream/PrintForeachAction.java | 59 ++++++ .../kstream/internals/KStreamForeach.java | 43 ---- .../streams/kstream/internals/KStreamImpl.java | 9 +- .../streams/kstream/internals/KStreamPeek.java | 9 +- .../streams/kstream/internals/KStreamPrint.java | 89 ++++++++ .../streams/kstream/internals/KTableImpl.java | 9 +- .../kstream/internals/KeyValuePrinter.java | 120 ----------- .../kstream/internals/KStreamPeekTest.java | 9 +- .../kstream/internals/KStreamPrintTest.java | 91 ++++++++ .../internals/KeyValuePrinterProcessorTest.java | 207 ------------------- 10 files changed, 262 insertions(+), 383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java new file mode 100644 index 0000000..3eb6d80 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/PrintForeachAction.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import java.io.PrintWriter; + +public class PrintForeachAction<K, V> implements ForeachAction<K, V> { + + private final String streamName; + private final PrintWriter printWriter; + + /** + * Print data message with given writer. The PrintWriter can be null in order to + * distinguish between {@code System.out} and the others. If the PrintWriter is {@code PrintWriter(System.out)}, + * then it would close {@code System.out} output stream. + * <p> + * Afterall, not to pass in {@code PrintWriter(System.out)} but {@code null} instead. + * + * @param printWriter Use {@code System.out.println} if {@code null}. + * @param streamName The given name will be printed. + */ + public PrintForeachAction(final PrintWriter printWriter, final String streamName) { + this.printWriter = printWriter; + this.streamName = streamName; + } + + @Override + public void apply(final K key, final V value) { + final String data = String.format("[%s]: %s, %s", streamName, key, value); + if (printWriter == null) { + System.out.println(data); + } else { + printWriter.println(data); + } + } + + public void close() { + if (printWriter == null) { + System.out.flush(); + } else { + printWriter.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java deleted file mode 100644 index eb3189c..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamForeach.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamForeach<K, V> implements ProcessorSupplier<K, V> { - - private final ForeachAction<K, V> action; - - public KStreamForeach(ForeachAction<K, V> action) { - this.action = action; - } - - @Override - public Processor<K, V> get() { - return new KStreamForeachProcessor(); - } - - private class KStreamForeachProcessor extends AbstractProcessor<K, V> { - @Override - public void process(K key, V value) { - action.apply(key, value); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index b751294..38b0a85 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.PrintForeachAction; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.GlobalKTable; @@ -193,7 +194,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) { String name = topology.newName(PRINTING_NAME); streamName = (streamName == null) ? this.name : streamName; - topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, streamName), this.name); + topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, streamName), keySerde, valSerde), this.name); } @@ -227,7 +228,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V try { PrintWriter printWriter = null; printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name()); - topology.addProcessor(name, new KeyValuePrinter<>(printWriter, keySerde, valSerde, streamName), this.name); + topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, streamName), keySerde, valSerde), this.name); } catch (FileNotFoundException | UnsupportedEncodingException e) { String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage(); throw new TopologyBuilderException(message); @@ -315,7 +316,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V Objects.requireNonNull(action, "action can't be null"); String name = topology.newName(FOREACH_NAME); - topology.addProcessor(name, new KStreamForeach<>(action), this.name); + topology.addProcessor(name, new KStreamPeek<>(action, false), this.name); } @Override @@ -323,7 +324,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V Objects.requireNonNull(action, "action can't be null"); final String name = topology.newName(PEEK_NAME); - topology.addProcessor(name, new KStreamPeek<>(action), this.name); + topology.addProcessor(name, new KStreamPeek<>(action, true), this.name); return new KStreamImpl<>(topology, name, sourceNodes, repartitionRequired); } http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java index 2e72f68..44d1d60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java @@ -23,10 +23,12 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; class KStreamPeek<K, V> implements ProcessorSupplier<K, V> { + private final boolean forwardDownStream; private final ForeachAction<K, V> action; - public KStreamPeek(final ForeachAction<K, V> action) { + public KStreamPeek(final ForeachAction<K, V> action, final boolean forwardDownStream) { this.action = action; + this.forwardDownStream = forwardDownStream; } @Override @@ -38,7 +40,10 @@ class KStreamPeek<K, V> implements ProcessorSupplier<K, V> { @Override public void process(final K key, final V value) { action.apply(key, value); - context().forward(key, value); + if (forwardDownStream) { + context().forward(key, value); + } } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java new file mode 100644 index 0000000..8447ae1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPrint.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.PrintForeachAction; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; + +public class KStreamPrint<K, V> implements ProcessorSupplier<K, V> { + + private final Serde<?> keySerde; + private final Serde<?> valueSerde; + private final ForeachAction<K, V> action; + + public KStreamPrint(final ForeachAction<K, V> action, final Serde<?> keySerde, final Serde<?> valueSerde) { + this.action = action; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + @Override + public Processor<K, V> get() { + return new KStreamPrintProcessor(keySerde, valueSerde); + } + + private class KStreamPrintProcessor extends AbstractProcessor<K, V> { + + private Serde<?> keySerde; + private Serde<?> valueSerde; + private ProcessorContext context; + + public KStreamPrintProcessor(final Serde<?> keySerde, final Serde<?> valueSerde) { + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + @Override + public void init(ProcessorContext context) { + this.context = context; + if (keySerde == null) { + this.keySerde = context.keySerde(); + } + if (valueSerde == null) { + this.valueSerde = context.valueSerde(); + } + } + + @Override + public void process(final K key, final V value) { + final K deKey = (K) maybeDeserialize(key, keySerde.deserializer()); + final V deValue = (V) maybeDeserialize(value, valueSerde.deserializer()); + action.apply(deKey, deValue); + } + + private Object maybeDeserialize(final Object keyOrValue, final Deserializer<?> deserializer) { + if (keyOrValue instanceof byte[]) { + return deserializer.deserialize(this.context.topic(), (byte[]) keyOrValue); + } + return keyOrValue; + } + + @Override + public void close() { + if (action instanceof PrintForeachAction) { + ((PrintForeachAction) action).close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 96a0b2c..2103ff2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.PrintForeachAction; import org.apache.kafka.streams.kstream.KGroupedTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -242,7 +243,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, public void print(Serde<K> keySerde, Serde<V> valSerde, String streamName) { String name = topology.newName(PRINTING_NAME); streamName = (streamName == null) ? this.name : streamName; - topology.addProcessor(name, new KeyValuePrinter<>(keySerde, valSerde, streamName), this.name); + topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(null, streamName), keySerde, valSerde), this.name); } @Override @@ -273,7 +274,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, streamName = (streamName == null) ? this.name : streamName; try { PrintWriter printWriter = new PrintWriter(filePath, StandardCharsets.UTF_8.name()); - topology.addProcessor(name, new KeyValuePrinter<>(printWriter, keySerde, valSerde, streamName), this.name); + topology.addProcessor(name, new KStreamPrint<>(new PrintForeachAction(printWriter, streamName), keySerde, valSerde), this.name); } catch (FileNotFoundException | UnsupportedEncodingException e) { String message = "Unable to write stream to file at [" + filePath + "] " + e.getMessage(); throw new TopologyBuilderException(message); @@ -284,12 +285,12 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, public void foreach(final ForeachAction<? super K, ? super V> action) { Objects.requireNonNull(action, "action can't be null"); String name = topology.newName(FOREACH_NAME); - KStreamForeach<K, Change<V>> processorSupplier = new KStreamForeach<>(new ForeachAction<K, Change<V>>() { + KStreamPeek<K, Change<V>> processorSupplier = new KStreamPeek<>(new ForeachAction<K, Change<V>>() { @Override public void apply(K key, Change<V> value) { action.apply(key, value.newValue); } - }); + }, false); topology.addProcessor(name, processorSupplier, this.name); } http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java deleted file mode 100644 index c1d4382..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinter.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -import java.io.PrintWriter; - - -class KeyValuePrinter<K, V> implements ProcessorSupplier<K, V> { - private final PrintWriter printWriter; - private Serde<?> keySerde; - private Serde<?> valueSerde; - private String streamName; - - KeyValuePrinter(PrintWriter printWriter, Serde<?> keySerde, Serde<?> valueSerde, String streamName) { - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.streamName = streamName; - this.printWriter = printWriter; - } - - KeyValuePrinter(PrintWriter printWriter, String streamName) { - this(printWriter, null, null, streamName); - } - - KeyValuePrinter(Serde<?> keySerde, Serde<?> valueSerde, String streamName) { - this(null, keySerde, valueSerde, streamName); - } - - @Override - public Processor<K, V> get() { - return new KeyValuePrinterProcessor(this.printWriter, this.keySerde, this.valueSerde, this.streamName); - } - - private class KeyValuePrinterProcessor extends AbstractProcessor<K, V> { - private final PrintWriter printWriter; - private Serde<?> keySerde; - private Serde<?> valueSerde; - private ProcessorContext processorContext; - private String streamName; - - private KeyValuePrinterProcessor(PrintWriter printWriter, Serde<?> keySerde, Serde<?> valueSerde, String streamName) { - this.printWriter = printWriter; - this.keySerde = keySerde; - this.valueSerde = valueSerde; - this.streamName = streamName; - } - - @Override - public void init(ProcessorContext context) { - this.processorContext = context; - - if (this.keySerde == null) { - keySerde = this.processorContext.keySerde(); - } - - if (this.valueSerde == null) { - valueSerde = this.processorContext.valueSerde(); - } - } - - @Override - public void process(K key, V value) { - K keyToPrint = (K) maybeDeserialize(key, keySerde.deserializer()); - V valueToPrint = (V) maybeDeserialize(value, valueSerde.deserializer()); - - println("[" + this.streamName + "]: " + keyToPrint + " , " + valueToPrint); - - this.processorContext.forward(key, value); - } - - private void println(String str) { - if (printWriter == null) - System.out.println(str); - else - printWriter.println(str); - } - - private Object maybeDeserialize(Object receivedElement, Deserializer<?> deserializer) { - if (receivedElement == null) { - return null; - } - - if (receivedElement instanceof byte[]) { - return deserializer.deserialize(this.processorContext.topic(), (byte[]) receivedElement); - } - - return receivedElement; - } - - @Override - public void close() { - if (printWriter == null) { - System.out.flush(); - } else { - printWriter.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java index 7991a92..df6f765 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.test.KStreamTestDriver; + import org.junit.After; import org.junit.Test; @@ -34,7 +36,8 @@ import static org.junit.Assert.fail; public class KStreamPeekTest { private final String topicName = "topic"; - + private final Serde<Integer> intSerd = Serdes.Integer(); + private final Serde<String> stringSerd = Serdes.String(); private KStreamTestDriver driver = null; @After @@ -47,7 +50,7 @@ public class KStreamPeekTest { @Test public void shouldObserveStreamElements() { final KStreamBuilder builder = new KStreamBuilder(); - final KStream<Integer, String> stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); + final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName); final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>(); stream.peek(collect(peekObserved)).foreach(collect(streamObserved)); @@ -66,7 +69,7 @@ public class KStreamPeekTest { @Test public void shouldNotAllowNullAction() { final KStreamBuilder builder = new KStreamBuilder(); - final KStream<Integer, String> stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); + final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName); try { stream.peek(null); fail("expected null action to throw NPE"); http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java new file mode 100644 index 0000000..c537e0a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPrintTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.PrintForeachAction; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.test.KStreamTestDriver; + +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +import java.io.PrintWriter; +import java.io.ByteArrayOutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class KStreamPrintTest { + + private final String topicName = "topic"; + private final Serde<Integer> intSerd = Serdes.Integer(); + private final Serde<String> stringSerd = Serdes.String(); + private PrintWriter printWriter; + private ByteArrayOutputStream byteOutStream; + private KStreamTestDriver driver = null; + + + @Before + public void setUp() { + byteOutStream = new ByteArrayOutputStream(); + printWriter = new PrintWriter(new OutputStreamWriter(byteOutStream, StandardCharsets.UTF_8)); + } + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + } + + @Test + public void testPrintKeyValueWithName() { + final KStreamPrint<Integer, String> kStreamPrint = new KStreamPrint<>(new PrintForeachAction(printWriter, "test-stream"), intSerd, stringSerd); + + final List<KeyValue<Integer, String>> inputRecords = Arrays.asList( + new KeyValue<>(0, "zero"), + new KeyValue<>(1, "one"), + new KeyValue<>(2, "two"), + new KeyValue<>(3, "three")); + + final String[] expectedResult = {"[test-stream]: 0, zero", "[test-stream]: 1, one", "[test-stream]: 2, two", "[test-stream]: 3, three"}; + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName); + stream.process(kStreamPrint); + + driver = new KStreamTestDriver(builder); + for (KeyValue<Integer, String> record: inputRecords) { + driver.process(topicName, record.key, record.value); + } + printWriter.flush(); + final String[] flushOutDatas = new String(byteOutStream.toByteArray(), Charset.forName("UTF-8")).split("\n"); + for (int i = 0; i < flushOutDatas.length; i++) { + assertEquals(flushOutDatas[i], expectedResult[i]); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/6910baf5/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java deleted file mode 100644 index 7de2b8b..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KeyValuePrinterProcessorTest.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - - -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.test.KStreamTestDriver; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.ByteArrayOutputStream; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class KeyValuePrinterProcessorTest { - - private final String topicName = "topic"; - private final Serde<String> stringSerde = Serdes.String(); - private ByteArrayOutputStream baos; - private KStreamBuilder builder; - private PrintWriter printWriter; - private KStreamTestDriver driver; - - @Before - public void setup() { - baos = new ByteArrayOutputStream(); - builder = new KStreamBuilder(); - printWriter = new PrintWriter(new OutputStreamWriter(baos, StandardCharsets.UTF_8)); - driver = null; - } - - @After - public void cleanup() { - if (driver != null) { - driver.close(); - } - driver = null; - } - - @Test - public void testPrintKeyValueDefaultSerde() throws Exception { - - KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printWriter, null); - String[] suppliedKeys = {"foo", "bar", null}; - String[] suppliedValues = {"value1", "value2", "value3"}; - String[] expectedValues = {"[null]: foo , value1", "[null]: bar , value2", "[null]: null , value3"}; - - - KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName); - stream.process(keyValuePrinter); - - driver = new KStreamTestDriver(builder); - for (int i = 0; i < suppliedKeys.length; i++) { - driver.process(topicName, suppliedKeys[i], suppliedValues[i]); - } - printWriter.flush(); - String[] capturedValues = new String(baos.toByteArray(), Charset.forName("UTF-8")).split("\n"); - - for (int i = 0; i < capturedValues.length; i++) { - assertEquals(capturedValues[i], expectedValues[i]); - } - } - - @Test - public void testPrintKeyValuesWithName() throws Exception { - - KeyValuePrinter<String, String> keyValuePrinter = new KeyValuePrinter<>(printWriter, "test-stream"); - String[] suppliedKeys = {"foo", "bar", null}; - String[] suppliedValues = {"value1", "value2", "value3"}; - String[] expectedValues = {"[test-stream]: foo , value1", "[test-stream]: bar , value2", "[test-stream]: null , value3"}; - - - KStream<String, String> stream = builder.stream(stringSerde, stringSerde, topicName); - stream.process(keyValuePrinter); - - driver = new KStreamTestDriver(builder); - for (int i = 0; i < suppliedKeys.length; i++) { - driver.process(topicName, suppliedKeys[i], suppliedValues[i]); - } - printWriter.flush(); - String[] capturedValues = new String(baos.toByteArray(), Charset.forName("UTF-8")).split("\n"); - - for (int i = 0; i < capturedValues.length; i++) { - assertEquals(capturedValues[i], expectedValues[i]); - } - } - - - @Test - public void testPrintKeyValueWithProvidedSerde() throws Exception { - - Serde<MockObject> mockObjectSerde = Serdes.serdeFrom(new MockSerializer(), new MockDeserializer()); - KeyValuePrinter<String, MockObject> keyValuePrinter = new KeyValuePrinter<>(printWriter, stringSerde, mockObjectSerde, null); - KStream<String, MockObject> stream = builder.stream(stringSerde, mockObjectSerde, topicName); - - stream.process(keyValuePrinter); - - driver = new KStreamTestDriver(builder); - - String suppliedKey = null; - byte[] suppliedValue = "{\"name\":\"print\", \"label\":\"test\"}".getBytes(Charset.forName("UTF-8")); - - driver.process(topicName, suppliedKey, suppliedValue); - printWriter.flush(); - String capturedValue = new String(baos.toByteArray(), Charset.forName("UTF-8")).trim(); - - assertEquals("[null]: null , name:print label:test", capturedValue); - - } - - private static class MockObject { - public String name; - public String label; - - public MockObject() { - } - - MockObject(String name, String label) { - this.name = name; - this.label = label; - } - - @Override - public String toString() { - return "name:" + name + " label:" + label; - } - } - - - private static class MockDeserializer implements Deserializer<MockObject> { - - private com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper(); - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - - } - - @Override - public MockObject deserialize(String topic, byte[] data) { - MockObject mockObject; - try { - mockObject = objectMapper.readValue(data, MockObject.class); - } catch (Exception e) { - throw new SerializationException(e); - } - return mockObject; - } - - @Override - public void close() { - - } - } - - - private static class MockSerializer implements Serializer<MockObject> { - private final com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper(); - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - - } - - @Override - public byte[] serialize(String topic, MockObject data) { - try { - return objectMapper.writeValueAsBytes(data); - } catch (Exception e) { - throw new SerializationException("Error serializing JSON message", e); - } - } - - @Override - public void close() { - - } - } - - -} \ No newline at end of file