Hello!
I am trying to use the RedisIO connector with Redis cluster but it looks like 
the Jedis client that RedisIO uses only works on a standalone Redis server, not 
on a cluster. I get this error when trying to read from Redis:


Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
redis.clients.jedis.exceptions.JedisMovedDa
taException: MOVED 15000 172.16.2.3:6379
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:371)
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:339)
        at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:219)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
       at com.oracle.quanta.RedisToAtp.run(RedisToAtp.java:196)
        at com.oracle.quanta.RedisToAtp.main(RedisToAtp.java:54)
Caused by: redis.clients.jedis.exceptions.JedisMovedDataException: MOVED 15000 
172.16.2.3:6379
        at redis.clients.jedis.Protocol.processError(Protocol.java:116)
        at redis.clients.jedis.Protocol.process(Protocol.java:166)
        at redis.clients.jedis.Protocol.read(Protocol.java:220)
        at 
redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:278)
        at 
redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:230)
        at redis.clients.jedis.Connection.getMultiBulkReply(Connection.java:224)
        at redis.clients.jedis.Jedis.mget(Jedis.java:474)
        at 
org.apache.beam.sdk.io.redis.RedisIO$ReadFn.fetchAndFlush(RedisIO.java:517)
        at 
org.apache.beam.sdk.io.redis.RedisIO$ReadFn.finishBundle(RedisIO.java:500)


This is the code that I use:


        PCollection<Event> events =

                pipeline

                        /*

                         * Step #1: Read from Redis.

                         */

                        .apply("Read Redis KV Store", RedisIO.read()

                                .withEndpoint(redisHost, 6379)

                                .withKeyPattern(redisKeyPattern))




Is there a way to configure RedisIO to work with a  cluster? I would have 
expected it to use JedisCluster when working with Redis in cluster mode but 
from 
RedisIO.java<https://github.com/apache/beam/blob/master/sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisIO.java>
 it appears that it only uses the standalone Jedis client.

Thank you!
~ Gaurav


Reply via email to