This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9ecadc4  MINOR: Use Java 8 lambdas in KStreamImplTest (#6430)
9ecadc4 is described below

commit 9ecadc4df474d9cfbfda3256f01eba1423cf5902
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Tue Mar 12 16:35:25 2019 -0400

    MINOR: Use Java 8 lambdas in KStreamImplTest (#6430)
    
    Just a minor cleanup to use Java 8 lambdas vs anonymous classes in this 
test.
    
    I ran all tests in the streams test suite
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../main/scala/kafka/tools/StreamsResetter.java    |  2 +-
 .../streams/kstream/internals/KStreamImplTest.java | 73 ++++------------------
 2 files changed, 14 insertions(+), 61 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index 3666f67..71529f8 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -257,7 +257,7 @@ public class StreamsResetter {
             CommandLineUtils.printUsageAndDie(optionParser, "Only one of 
--dry-run and --execute can be specified");
         }
 
-        final scala.collection.immutable.HashSet<OptionSpec<?>> 
allScenarioOptions = new scala.collection.immutable.HashSet<OptionSpec<?>>();
+        final scala.collection.immutable.HashSet<OptionSpec<?>> 
allScenarioOptions = new scala.collection.immutable.HashSet<>();
         allScenarioOptions.$plus(toOffsetOption);
         allScenarioOptions.$plus(toDatetimeOption);
         allScenarioOptions.$plus(byDurationOption);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 76a01cd..bd2ab5b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -104,78 +104,31 @@ public class KStreamImplTest {
 
         final KStream<String, String> source2 = 
builder.stream(Arrays.asList("topic-3", "topic-4"), stringConsumed);
 
-        final KStream<String, String> stream1 =
-            source1.filter(new Predicate<String, String>() {
-                @Override
-                public boolean test(final String key, final String value) {
-                    return true;
-                }
-            }).filterNot(new Predicate<String, String>() {
-                @Override
-                public boolean test(final String key, final String value) {
-                    return false;
-                }
-            });
+        final KStream<String, String> stream1 = source1.filter((key, value) -> 
true)
+                                                       .filterNot((key, value) 
-> false);
 
-        final KStream<String, Integer> stream2 = stream1.mapValues(new 
ValueMapper<String, Integer>() {
-            @Override
-            public Integer apply(final String value) {
-                return new Integer(value);
-            }
-        });
+        final KStream<String, Integer> stream2 = 
stream1.mapValues(Integer::new);
 
-        final KStream<String, Integer> stream3 = source2.flatMapValues(new 
ValueMapper<String, Iterable<Integer>>() {
-            @Override
-            public Iterable<Integer> apply(final String value) {
-                return Collections.singletonList(new Integer(value));
-            }
-        });
+        final KStream<String, Integer> stream3 = 
source2.flatMapValues((ValueMapper<String, Iterable<Integer>>)
+            value -> Collections.singletonList(new Integer(value)));
 
         final KStream<String, Integer>[] streams2 = stream2.branch(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) 
{
-                        return (value % 2) == 0;
-                    }
-                },
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) 
{
-                        return true;
-                    }
-                }
+            (key, value) -> (value % 2) == 0,
+            (key, value) -> true
         );
 
         final KStream<String, Integer>[] streams3 = stream3.branch(
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) 
{
-                        return (value % 2) == 0;
-                    }
-                },
-                new Predicate<String, Integer>() {
-                    @Override
-                    public boolean test(final String key, final Integer value) 
{
-                        return true;
-                    }
-                }
+            (key, value) -> (value % 2) == 0,
+            (key, value) -> true
         );
 
         final int anyWindowSize = 1;
         final Joined<String, Integer, Integer> joined = 
Joined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer());
-        final KStream<String, Integer> stream4 = streams2[0].join(streams3[0], 
new ValueJoiner<Integer, Integer, Integer>() {
-            @Override
-            public Integer apply(final Integer value1, final Integer value2) {
-                return value1 + value2;
-            }
-        }, JoinWindows.of(ofMillis(anyWindowSize)), joined);
+        final KStream<String, Integer> stream4 = streams2[0].join(streams3[0],
+            (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(anyWindowSize)), joined);
 
-        streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, 
Integer>() {
-            @Override
-            public Integer apply(final Integer value1, final Integer value2) {
-                return value1 + value2;
-            }
-        }, JoinWindows.of(ofMillis(anyWindowSize)), joined);
+        streams2[1].join(streams3[1], (value1, value2) -> value1 + value2,
+            JoinWindows.of(ofMillis(anyWindowSize)), joined);
 
         stream4.to("topic-5");
 

Reply via email to