This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 3e2f06a [BEAM-6483] Add support for SADD operation to RedisIO.Write new 664e5ab Merge pull request #7587: [BEAM-6483] Add support for SADD operation to RedisIO.Write 3e2f06a is described below commit 3e2f06a8f1634220e87649969e16d5b5d47aac12 Author: Kengo Seki <sek...@apache.org> AuthorDate: Tue Jan 22 10:11:10 2019 -0800 [BEAM-6483] Add support for SADD operation to RedisIO.Write For now, RedisIO.Write supports write methods for string (APPEND, SET), list (LPUSH, RPUSH) and HyperLogLog (PFADD), but not for set (SADD). This PR adds it. In addition, I did the following refactoring in this: * make the input value check for port number stricter * replace a magic number indicating the end of a loop with a constant * remove an unnecessary argument from writeUsingHLLCommand, which is a private method used only internally --- .../java/org/apache/beam/sdk/io/redis/RedisIO.java | 20 ++++++++++++---- .../org/apache/beam/sdk/io/redis/RedisIOTest.java | 28 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java index 8d6b0be..06ba187 100644 --- a/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java +++ b/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java @@ -167,7 +167,7 @@ public class RedisIO { public Read withEndpoint(String host, int port) { checkArgument(host != null, "host can not be null"); - checkArgument(port > 0, "port can not be negative or 0"); + checkArgument(0 < port && port < 65536, "port must be a positive integer less than 65536"); return builder() .setConnectionConfiguration(connectionConfiguration().withHost(host).withPort(port)) .build(); @@ -320,7 +320,7 @@ public class RedisIO { processContext.output(k); } cursor = scanResult.getStringCursor(); - if ("0".equals(cursor)) { + if (cursor.equals(ScanParams.SCAN_POINTER_START)) { finished = true; } } @@ -446,6 +446,9 @@ public class RedisIO { */ RPUSH, + /** Use SADD command. Insert value in a set. Duplicated values are ignored. */ + SADD, + /** Use PFADD command. Insert value in a HLL structure. Create key if it doesn't exist */ PFADD } @@ -570,8 +573,10 @@ public class RedisIO { writeUsingSetCommand(record, expireTime); } else if (Method.LPUSH == method || Method.RPUSH == method) { writeUsingListCommand(record, method, expireTime); + } else if (Method.SADD == method) { + writeUsingSaddCommand(record, expireTime); } else if (Method.PFADD == method) { - writeUsingHLLCommand(record, method, expireTime); + writeUsingHLLCommand(record, expireTime); } } @@ -610,7 +615,14 @@ public class RedisIO { setExpireTimeWhenRequired(key, expireTime); } - private void writeUsingHLLCommand(KV<String, String> record, Method method, Long expireTime) { + private void writeUsingSaddCommand(KV<String, String> record, Long expireTime) { + String key = record.getKey(); + String value = record.getValue(); + + pipeline.sadd(key, value); + } + + private void writeUsingHLLCommand(KV<String, String> record, Long expireTime) { String key = record.getKey(); String value = record.getValue(); diff --git a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java index 529a854..c33355c 100644 --- a/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java +++ b/sdks/java/io/redis/src/test/java/org/apache/beam/sdk/io/redis/RedisIOTest.java @@ -20,8 +20,10 @@ package org.apache.beam.sdk.io.redis; import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.beam.sdk.io.redis.RedisIO.Write.Method; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -29,6 +31,8 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -187,6 +191,30 @@ public class RedisIOTest { } @Test + public void testWriteReadUsingSaddMethod() throws Exception { + String key = "key"; + + Jedis jedis = + RedisConnectionConfiguration.create(REDIS_HOST, embeddedRedis.getPort()).connect(); + + List<String> values = Arrays.asList("0", "1", "2", "3", "2", "4", "0", "5"); + List<KV<String, String>> kvs = Lists.newArrayList(); + for (String value : values) { + kvs.add(KV.of(key, value)); + } + + PCollection<KV<String, String>> write = writePipeline.apply(Create.of(kvs)); + write.apply( + RedisIO.write().withEndpoint(REDIS_HOST, embeddedRedis.getPort()).withMethod(Method.SADD)); + + writePipeline.run(); + + Set<String> expected = Sets.newHashSet(values); + Set<String> members = jedis.smembers(key); + Assert.assertEquals(expected, members); + } + + @Test public void testWriteUsingHLLMethod() throws Exception { String key = "key";