keith-turner commented on issue #1549: Fixes #1542 - Created a randomIndex member of ThriftTransportPool URL: https://github.com/apache/accumulo/pull/1549#issuecomment-595492380 I think the goal of the fix is good. One way to achieve the goal without changing ThriftTransportPool is to pass a list of one server to it. I tried this locally using the List.subList function and ran the test successfully a bunch of times. Also looking at the code that gets the list of servers I see its incorrect (it never reads from zookeeper again) and way more complicated than needed. I simplified it while experimenting. Below are the diffs from my local testing. ```diff --- a/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/TransportCachingIT.java @@ -18,15 +18,12 @@ */ package org.apache.accumulo.test; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; -import java.util.ArrayList; import java.util.List; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -34,9 +31,7 @@ import org.apache.accumulo.core.clientImpl.ThriftTransportKey; import org.apache.accumulo.core.clientImpl.ThriftTransportPool; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.util.ServerServices; -import org.apache.accumulo.core.util.ServerServices.Service; -import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; @@ -44,61 +39,32 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** * Test that {@link ThriftTransportPool} actually adheres to the cachedConnection argument */ public class TransportCachingIT extends AccumuloClusterHarness { private static final Logger log = LoggerFactory.getLogger(TransportCachingIT.class); - private static int ATTEMPTS = 0; @Test public void testCachedTransport() throws InterruptedException { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - while (client.instanceOperations().getTabletServers().isEmpty()) { + + List<String> tservers; + + while ((tservers = client.instanceOperations().getTabletServers()).isEmpty()) { // sleep until a tablet server is up Thread.sleep(50); } + ClientContext context = (ClientContext) client; long rpcTimeout = ConfigurationTypeHelper.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT.getDefaultValue()); - ZooCache zc = context.getZooCache(); - final String zkRoot = context.getZooKeeperRoot(); - - // wait until Zookeeper is populated - List<String> children = zc.getChildren(zkRoot + Constants.ZTSERVERS); - while (children.isEmpty()) { - Thread.sleep(100); - children = zc.getChildren(zkRoot + Constants.ZTSERVERS); - } - - ArrayList<ThriftTransportKey> servers = new ArrayList<>(); - while (servers.isEmpty()) { - for (String tserver : children) { - String path = zkRoot + Constants.ZTSERVERS + "/" + tserver; - byte[] data = zc.getLockData(path); - if (data != null) { - String strData = new String(data, UTF_8); - if (!strData.equals("master")) - servers.add(new ThriftTransportKey( - new ServerServices(strData).getAddress(Service.TSERV_CLIENT), rpcTimeout, - context)); - } - } - ATTEMPTS++; - if (!servers.isEmpty()) - break; - else { - if (ATTEMPTS < 100) { - log.warn("Making another attempt to add ThriftTransportKey servers"); - Thread.sleep(100); - } else { - log.error("Failed to add ThriftTransportKey servers - Failing TransportCachingIT test"); - org.junit.Assert - .fail("Failed to add ThriftTransportKey servers - Failing TransportCachingIT test"); - } - } - } + List<ThriftTransportKey> servers = Lists.transform(tservers, + serverStr -> new ThriftTransportKey(HostAndPort.fromString(serverStr), rpcTimeout, + context)); ThriftTransportPool pool = ThriftTransportPool.getInstance(); TTransport first = null; @@ -148,7 +114,7 @@ public class TransportCachingIT extends AccumuloClusterHarness { while (fourth == null) { try { // Get a non-cached transport - fourth = pool.getAnyTransport(servers, false).getSecond(); + fourth = pool.getAnyTransport(servers.subList(0, 1), false).getSecond(); } catch (TTransportException e) { log.warn("Failed obtain 4th transport to {}", servers); } @@ -158,7 +124,7 @@ public class TransportCachingIT extends AccumuloClusterHarness { while (fifth == null) { try { // Get a cached transport - fifth = pool.getAnyTransport(servers, true).getSecond(); + fifth = pool.getAnyTransport(servers.subList(0, 1), true).getSecond(); } catch (TTransportException e) { log.warn("Failed obtain 5th transport to {}", servers); } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
