This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e2f06a  [BEAM-6483] Add support for SADD operation to RedisIO.Write
     new 664e5ab  Merge pull request #7587: [BEAM-6483] Add support for SADD 
operation to RedisIO.Write
3e2f06a is described below

commit 3e2f06a8f1634220e87649969e16d5b5d47aac12
Author: Kengo Seki <sek...@apache.org>
AuthorDate: Tue Jan 22 10:11:10 2019 -0800

    [BEAM-6483] Add support for SADD operation to RedisIO.Write
    
    For now, RedisIO.Write supports write methods for string (APPEND, SET),
    list (LPUSH, RPUSH) and HyperLogLog (PFADD), but not for set (SADD).
    This PR adds it. In addition, I did the following refactoring in this:
    
    * make the input value check for port number stricter
    * replace a magic number indicating the end of a loop with a constant
    * remove an unnecessary argument from writeUsingHLLCommand,
      which is a private method used only internally
---
 .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 20 ++++++++++++----
 .../org/apache/beam/sdk/io/redis/RedisIOTest.java  | 28 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git 
a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java 
b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
index 8d6b0be..06ba187 100644
--- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
+++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java
@@ -167,7 +167,7 @@ public class RedisIO {
 
     public Read withEndpoint(String host, int port) {
       checkArgument(host != null, "host can not be null");
-      checkArgument(port > 0, "port can not be negative or 0");
+      checkArgument(0 < port && port < 65536, "port must be a positive integer 
less than 65536");
       return builder()
           
.setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port))
           .build();
@@ -320,7 +320,7 @@ public class RedisIO {
           processContext.output(k);
         }
         cursor = scanResult.getStringCursor();
-        if ("0".equals(cursor)) {
+        if (cursor.equals(ScanParams.SCAN_POINTER_START)) {
           finished = true;
         }
       }
@@ -446,6 +446,9 @@ public class RedisIO {
        */
       RPUSH,
 
+      /** Use SADD command. Insert value in a set. Duplicated values are 
ignored. */
+      SADD,
+
       /** Use PFADD command. Insert value in a HLL structure. Create key if it 
doesn't exist */
       PFADD
     }
@@ -570,8 +573,10 @@ public class RedisIO {
           writeUsingSetCommand(record, expireTime);
         } else if (Method.LPUSH == method || Method.RPUSH == method) {
           writeUsingListCommand(record, method, expireTime);
+        } else if (Method.SADD == method) {
+          writeUsingSaddCommand(record, expireTime);
         } else if (Method.PFADD == method) {
-          writeUsingHLLCommand(record, method, expireTime);
+          writeUsingHLLCommand(record, expireTime);
         }
       }
 
@@ -610,7 +615,14 @@ public class RedisIO {
         setExpireTimeWhenRequired(key, expireTime);
       }
 
-      private void writeUsingHLLCommand(KV<String, String> record, Method 
method, Long expireTime) {
+      private void writeUsingSaddCommand(KV<String, String> record, Long 
expireTime) {
+        String key = record.getKey();
+        String value = record.getValue();
+
+        pipeline.sadd(key, value);
+      }
+
+      private void writeUsingHLLCommand(KV<String, String> record, Long 
expireTime) {
         String key = record.getKey();
         String value = record.getValue();
 
diff --git 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
index 529a854..c33355c 100644
--- 
a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
+++ 
b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java
@@ -20,8 +20,10 @@ package org.apache.beam.sdk.io.redis;
 import java.io.IOException;
 import java.net.ServerSocket;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import org.apache.beam.sdk.io.redis.RedisIO.Write.Method;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -29,6 +31,8 @@ import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -187,6 +191,30 @@ public class RedisIOTest {
   }
 
   @Test
+  public void testWriteReadUsingSaddMethod() throws Exception {
+    String key = "key";
+
+    Jedis jedis =
+        RedisConnectionConfiguration.create(REDIS_HOST, 
embeddedRedis.getPort()).connect();
+
+    List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", 
"5");
+    List<KV<String, String>> kvs = Lists.newArrayList();
+    for (String value : values) {
+      kvs.add(KV.of(key, value));
+    }
+
+    PCollection<KV<String, String>> write = 
writePipeline.apply(Create.of(kvs));
+    write.apply(
+        RedisIO.write().withEndpoint(REDIS_HOST, 
embeddedRedis.getPort()).withMethod(Method.SADD));
+
+    writePipeline.run();
+
+    Set<String> expected = Sets.newHashSet(values);
+    Set<String> members = jedis.smembers(key);
+    Assert.assertEquals(expected, members);
+  }
+
+  @Test
   public void testWriteUsingHLLMethod() throws Exception {
     String key = "key";
 

Reply via email to