upthewaterspout commented on a change in pull request #6467:
URL: https://github.com/apache/geode/pull/6467#discussion_r633715603



##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
##########
@@ -79,9 +80,12 @@ public GeodeRedisServer(String bindAddress, int port, 
InternalCache cache) {
     redisStats = createStats(cache);
     StripedExecutor stripedExecutor = new SynchronizedStripedExecutor();
     regionProvider = new RegionProvider(cache);
+    CommandHelper commandHelper =
+        new CommandHelper(regionProvider.getDataRegion(), redisStats, 
stripedExecutor);
 
     CommandFunction.register(regionProvider.getDataRegion(), stripedExecutor, 
redisStats);
     RenameFunction.register(regionProvider.getDataRegion(), stripedExecutor, 
redisStats);
+    RedisMemberInfoRetrievalFunction infoFunction = 
RedisMemberInfoRetrievalFunction.register();

Review comment:
       I think we need to make sure this is registered before the data region 
is created, so probably before the regionProvider initialization?

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/SlotAdvisor.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal;
+
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
+import 
org.apache.geode.redis.internal.cluster.RedisMemberInfoRetrievalFunction;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class SlotAdvisor {
+
+  private static final Logger logger = LogService.getLogger();
+  private static final int HOSTPORT_RETRIEVAL_ATTEMPTS = 20;
+  private static final int HOSTPORT_RETRIEVAL_INTERVAL = 100;
+
+  /**
+   * Cache of member ids to member IP address and redis listening port
+   */
+  private final Map<DistributedMember, RedisMemberInfo> memberInfos = new 
HashMap<>();
+  private final PartitionedRegion dataRegion;
+
+  SlotAdvisor(Region<RedisKey, RedisData> dataRegion) {
+    this.dataRegion = (PartitionedRegion) dataRegion;
+  }
+
+  public boolean isLocal(RedisKey key) {

Review comment:
       I think this method is unused?

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/SlotAdvisor.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal;
+
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
+import static 
org.apache.geode.redis.internal.cluster.RedisMemberInfoRetrievalFunction.RedisMemberInfo;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.partition.PartitionMemberInfo;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.partition.PartitionRegionInfo;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import 
org.apache.geode.redis.internal.cluster.RedisMemberInfoRetrievalFunction;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class SlotAdvisor {
+
+  private static final Logger logger = LogService.getLogger();
+
+  /**
+   * Mapping buckets to slot information
+   */
+  private final List<MemberBucketSlot> memberBucketSlots;
+
+  /**
+   * Cache of member ids to member IP address and redis listening port
+   */
+  private final Map<String, Pair<String, Integer>> hostPorts = new HashMap<>();
+  private final PartitionedRegion dataRegion;
+  private final Region<String, Object> configRegion;
+
+  SlotAdvisor(Region<RedisKey, RedisData> dataRegion, Region<String, Object> 
configRegion) {
+    this.dataRegion = (PartitionedRegion) dataRegion;
+    this.configRegion = configRegion;
+    memberBucketSlots = new ArrayList<>(RegionProvider.REDIS_REGION_BUCKETS);
+    for (int i = 0; i < REDIS_REGION_BUCKETS; i++) {
+      memberBucketSlots.add(null);
+    }
+  }
+
+  public boolean isLocal(RedisKey key) {
+    return 
dataRegion.getRegionAdvisor().getBucket(key.getBucketId()).getBucketAdvisor()
+        .isPrimary();
+  }
+
+  public Pair<String, Integer> getHostAndPortForKey(RedisKey key) {
+    int bucketId = key.getBucketId();
+    MemberBucketSlot mbs = updateBucketDetails(bucketId);
+
+    return Pair.of(mbs.getPrimaryIpAddress(), mbs.getPrimaryPort());
+  }
+
+  public Map<String, List<Integer>> getMemberBuckets() {
+    initializeBucketsIfNecessary();
+
+    Map<String, List<Integer>> memberBuckets = new HashMap<>();
+    for (int bucketId = 0; bucketId < REDIS_REGION_BUCKETS; bucketId++) {
+      String memberId =
+          
dataRegion.getRegionAdvisor().getBucketAdvisor(bucketId).getPrimary().getUniqueId();
+      memberBuckets.computeIfAbsent(memberId, k -> new 
ArrayList<>()).add(bucketId);
+    }
+
+    return memberBuckets;
+  }
+
+  public synchronized List<MemberBucketSlot> getBucketSlots() {
+    initializeBucketsIfNecessary();
+
+    for (int bucketId = 0; bucketId < REDIS_REGION_BUCKETS; bucketId++) {
+      updateBucketDetails(bucketId);
+    }
+
+    return Collections.unmodifiableList(memberBucketSlots);
+  }
+
+  private synchronized MemberBucketSlot updateBucketDetails(int bucketId) {
+    MemberBucketSlot mbs = null;
+    try {
+      Pair<String, Integer> hostPort = getHostPort(bucketId);
+
+      mbs = memberBucketSlots.get(bucketId);
+      if (mbs == null) {
+        mbs = new MemberBucketSlot(bucketId, hostPort.getLeft(), 
hostPort.getRight());
+        memberBucketSlots.set(bucketId, mbs);
+      } else {
+        mbs.setPrimaryIpAddress(hostPort.getLeft());
+        mbs.setPrimaryPort(hostPort.getRight());
+      }
+    } catch (Exception ex) {
+      logger.error("Unable to update bucket detail for bucketId: {}", 
bucketId, ex);
+    }
+
+    return mbs;
+  }
+
+  private void initializeBucketsIfNecessary() {
+    if (dataRegion.getDataStore() != null &&
+        dataRegion.getDataStore().getAllLocalBucketIds().isEmpty()) {
+      PartitionRegionHelper.assignBucketsToPartitions(dataRegion);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Pair<String, Integer> getHostPort(int bucketId) {
+    String memberId =
+        
dataRegion.getRegionAdvisor().getBucketAdvisor(bucketId).getPrimary().getUniqueId();
+
+    if (hostPorts.containsKey(memberId)) {
+      return hostPorts.get(memberId);
+    }
+
+    Set<DistributedMember> membersWithDataRegion = new HashSet<>();
+    for (PartitionMemberInfo memberInfo : getRegionMembers(dataRegion)) {
+      membersWithDataRegion.add(memberInfo.getDistributedMember());
+    }
+
+    ResultCollector<RedisMemberInfo, List<RedisMemberInfo>> resultCollector =
+        FunctionService.onMembers(membersWithDataRegion)
+            .execute(RedisMemberInfoRetrievalFunction.ID);
+
+    hostPorts.clear();
+
+    for (RedisMemberInfo memberInfo : resultCollector.getResult()) {
+      Pair<String, Integer> hostPort =
+          Pair.of(memberInfo.getHostAddress(), memberInfo.getRedisPort());
+      hostPorts.put(memberInfo.getMemberId(), hostPort);
+    }
+
+    if (!hostPorts.containsKey(memberId)) {
+      // There is a very tiny window where this might happen - a member was 
hosting a bucket and
+      // died before the fn call above could even complete.
+      throw new RuntimeException("Unable to retrieve host and redis port for 
member: " + memberId);
+    }
+
+    return hostPorts.get(memberId);
+  }
+
+  private Set<PartitionMemberInfo> getRegionMembers(PartitionedRegion 
dataRegion) {
+    PartitionRegionInfo info = 
PartitionRegionHelper.getPartitionRegionInfo(dataRegion);
+    assert info != null; // Mostly to appease IJ since the region is always a 
PR
+
+    return info.getPartitionMemberInfo();
+  }
+
+  public static class MemberBucketSlot {

Review comment:
       Hmm, these still look like `Integer`. Maybe forgot to push something?

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/SlotAdvisor.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal;
+
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
+import 
org.apache.geode.redis.internal.cluster.RedisMemberInfoRetrievalFunction;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class SlotAdvisor {
+
+  private static final Logger logger = LogService.getLogger();
+  private static final int HOSTPORT_RETRIEVAL_ATTEMPTS = 20;
+  private static final int HOSTPORT_RETRIEVAL_INTERVAL = 100;
+
+  /**
+   * Cache of member ids to member IP address and redis listening port
+   */
+  private final Map<DistributedMember, RedisMemberInfo> memberInfos = new 
HashMap<>();
+  private final PartitionedRegion dataRegion;
+
+  SlotAdvisor(Region<RedisKey, RedisData> dataRegion) {
+    this.dataRegion = (PartitionedRegion) dataRegion;
+  }
+
+  public boolean isLocal(RedisKey key) {
+    return 
dataRegion.getRegionAdvisor().getBucket(key.getBucketId()).getBucketAdvisor()
+        .isPrimary();
+  }
+
+  public Pair<String, Integer> getHostAndPortForKey(RedisKey key) {
+    try {
+      RedisMemberInfo memberInfo = getMemberInfo(key.getBucketId());
+      return Pair.of(memberInfo.getHostAddress(), memberInfo.getRedisPort());
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(ex);
+    }
+  }
+
+  public Map<String, List<Integer>> getMemberBuckets() {
+    Map<String, List<Integer>> memberBuckets = new HashMap<>();
+    for (int bucketId = 0; bucketId < REDIS_REGION_BUCKETS; bucketId++) {
+      String memberId = getOrCreateMember(bucketId).getUniqueId();
+      memberBuckets.computeIfAbsent(memberId, k -> new 
ArrayList<>()).add(bucketId);
+    }
+
+    return memberBuckets;
+  }
+
+  /**
+   * This returns a list of {@link MemberBucketSlot}s where each entry 
corresponds to a bucket. If
+   * the details for a given bucket cannot be determined, that entry will 
contain {@code null}.
+   */
+  public synchronized List<MemberBucketSlot> getBucketSlots() {
+    List<MemberBucketSlot> memberBucketSlots = new 
ArrayList<>(RegionProvider.REDIS_REGION_BUCKETS);
+    try {
+      for (int bucketId = 0; bucketId < REDIS_REGION_BUCKETS; bucketId++) {
+        RedisMemberInfo memberInfo = getMemberInfo(bucketId);
+        if (memberInfo != null) {
+          memberBucketSlots.add(
+              new MemberBucketSlot(bucketId, memberInfo.getHostAddress(),
+                  memberInfo.getRedisPort()));
+        }
+      }
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(ex);
+    }
+
+    return memberBucketSlots;
+  }
+
+  private InternalDistributedMember getOrCreateMember(int bucketId) {
+    return dataRegion.getOrCreateNodeForBucketWrite(bucketId, null);
+  }
+
+  /**
+   * This method will retry for {@link #HOSTPORT_RETRIEVAL_ATTEMPTS} attempts 
and return null if
+   * no information could be retrieved.
+   */
+  private RedisMemberInfo getMemberInfo(int bucketId) throws 
InterruptedException {
+    RedisMemberInfo response;
+
+    for (int i = 0; i < HOSTPORT_RETRIEVAL_ATTEMPTS; i++) {
+      response = getMemberInfo0(bucketId);
+      if (response != null) {
+        return response;
+      }
+
+      Thread.sleep(HOSTPORT_RETRIEVAL_INTERVAL);
+    }
+
+    logger.error("Unable to retrieve host and redis port for member with 
bucketId: {}", bucketId);
+
+    return null;
+  }
+
+  @SuppressWarnings("unchecked")
+  private RedisMemberInfo getMemberInfo0(int bucketId) {
+    InternalDistributedMember member = getOrCreateMember(bucketId);
+
+    if (memberInfos.containsKey(member)) {
+      return memberInfos.get(member);
+    }
+
+    List<RedisMemberInfo> memberInfos;
+    try {
+      ResultCollector<RedisMemberInfo, List<RedisMemberInfo>> resultCollector =
+          
FunctionService.onRegion(dataRegion).execute(RedisMemberInfoRetrievalFunction.ID);
+      memberInfos = resultCollector.getResult();
+    } catch (Exception e) {

Review comment:
       Since you've changed the function to be registered earlier, I don't 
think it makes sense to catch and ignore function failures anymore.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/cluster/RedisMemberInfoRetrievalFunction.java
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.cluster;
+
+import java.net.InetAddress;
+
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.internal.inet.LocalHostUtil;
+
+/**
+ * There is a race between registering this function and redis clients being 
able to call
+ * {@code CLUSTER SLOTS} and receive accurate information. To help with this 
the function is
+ * registered before the redis server is available, but is only initialized 
with the host and port
+ * once the redis server is running. If the function returns null it means 
that the function has
+ * not been initialized yet and callers, (from java), should retry.
+ */
+public class RedisMemberInfoRetrievalFunction implements 
InternalFunction<Void> {
+
+  public static final String ID = 
RedisMemberInfoRetrievalFunction.class.getName();
+  private static final long serialVersionUID = 2207969011229079993L;
+
+  private String hostAddress = null;
+  private Integer redisPort = null;

Review comment:
       `Integer` again.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/SlotAdvisor.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal;
+
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
+import 
org.apache.geode.redis.internal.cluster.RedisMemberInfoRetrievalFunction;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class SlotAdvisor {
+
+  private static final Logger logger = LogService.getLogger();
+  private static final int HOSTPORT_RETRIEVAL_ATTEMPTS = 10;
+  private static final int HOSTPORT_RETRIEVAL_INTERVAL = 6_000;

Review comment:
       Seems reasonable.

##########
File path: 
geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/SlotAdvisor.java
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal;
+
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static 
org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.cluster.RedisMemberInfo;
+import 
org.apache.geode.redis.internal.cluster.RedisMemberInfoRetrievalFunction;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+
+public class SlotAdvisor {
+
+  private static final Logger logger = LogService.getLogger();
+  private static final int HOSTPORT_RETRIEVAL_ATTEMPTS = 10;
+  private static final int HOSTPORT_RETRIEVAL_INTERVAL = 6_000;
+
+  /**
+   * Cache of member ids to member IP address and redis listening port
+   */
+  private final Map<DistributedMember, Pair<String, Integer>> hostPorts = new 
HashMap<>();
+  private final PartitionedRegion dataRegion;
+
+  SlotAdvisor(Region<RedisKey, RedisData> dataRegion) {
+    this.dataRegion = (PartitionedRegion) dataRegion;
+  }
+
+  public boolean isLocal(RedisKey key) {
+    return 
dataRegion.getRegionAdvisor().getBucket(key.getBucketId()).getBucketAdvisor()
+        .isPrimary();
+  }
+
+  public Pair<String, Integer> getHostAndPortForKey(RedisKey key) {
+    return getHostPort(key.getBucketId());
+  }
+
+  public Map<String, List<Integer>> getMemberBuckets() {
+    initializeBucketsIfNecessary();
+
+    Map<String, List<Integer>> memberBuckets = new HashMap<>();
+    for (int bucketId = 0; bucketId < REDIS_REGION_BUCKETS; bucketId++) {
+      String memberId = getOrCreateMember(bucketId).getUniqueId();
+      memberBuckets.computeIfAbsent(memberId, k -> new 
ArrayList<>()).add(bucketId);
+    }
+
+    return memberBuckets;
+  }
+
+  /**
+   * This returns a list of {@link MemberBucketSlot}s where each entry 
corresponds to a bucket. If
+   * the details for a given bucket cannot be determined, that entry will 
contain {@code null}.
+   */
+  public synchronized List<MemberBucketSlot> getBucketSlots() {
+    initializeBucketsIfNecessary();
+
+    List<MemberBucketSlot> memberBucketSlots = new 
ArrayList<>(RegionProvider.REDIS_REGION_BUCKETS);
+    for (int bucketId = 0; bucketId < REDIS_REGION_BUCKETS; bucketId++) {
+      Pair<String, Integer> hostPort = getHostPort(bucketId);
+      if (hostPort != null) {
+        memberBucketSlots.add(
+            new MemberBucketSlot(bucketId, hostPort.getLeft(), 
hostPort.getRight()));
+      }
+    }
+
+    return memberBucketSlots;
+  }
+
+  private InternalDistributedMember getOrCreateMember(int bucketId) {
+    return dataRegion.getOrCreateNodeForBucketWrite(bucketId, null);
+  }
+
+  private void initializeBucketsIfNecessary() {
+    if (dataRegion.getDataStore() != null &&
+        dataRegion.getDataStore().getAllLocalBucketIds().isEmpty()) {
+      PartitionRegionHelper.assignBucketsToPartitions(dataRegion);
+    }
+  }
+
+  /**
+   * This method will retry for {@link #HOSTPORT_RETRIEVAL_ATTEMPTS} attempts 
and return null if
+   * no information could be retrieved.
+   */
+  private Pair<String, Integer> getHostPort(int bucketId) {
+    Pair<String, Integer> response;
+
+    for (int i = 0; i < HOSTPORT_RETRIEVAL_ATTEMPTS; i++) {
+      response = getHostPort0(bucketId);
+      if (response != null) {
+        return response;
+      }
+
+      try {
+        Thread.sleep(HOSTPORT_RETRIEVAL_INTERVAL);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);

Review comment:
       Is writing an error back to the client the right way to handle an 
interrupt though? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to