ddanielr commented on code in PR #5375:
URL: https://github.com/apache/accumulo/pull/5375#discussion_r1992083227


##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -157,62 +160,149 @@ void start(Path bulkDir, Manager manager, long tid, 
boolean setTime) throws Exce
 
   private static class OnlineLoader extends Loader {
 
+    private final int maxConnections;
     long timeInMillis;
     String fmtTid;
     int locationLess = 0;
 
-    // track how many tablets were sent load messages per tablet server
-    MapCounter<HostAndPort> loadMsgs;
+    int tabletsAdded;
 
     // Each RPC to a tablet server needs to check in zookeeper to see if the 
transaction is still
     // active. The purpose of this map is to group load request by tablet 
servers inorder to do less
     // RPCs. Less RPCs will result in less calls to Zookeeper.
     Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
     private int queuedDataSize = 0;
 
+    public OnlineLoader(AccumuloConfiguration configuration) {
+      super();
+      this.maxConnections = 
configuration.getCount(Property.MANAGER_BULK_MAX_CONNECTIONS);
+    }
+
     @Override
     void start(Path bulkDir, Manager manager, long tid, boolean setTime) 
throws Exception {
       super.start(bulkDir, manager, tid, setTime);
 
       timeInMillis = 
manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
       fmtTid = FateTxId.formatTid(tid);
 
-      loadMsgs = new MapCounter<>();
+      tabletsAdded = 0;
 
       loadQueue = new HashMap<>();
     }
 
+    private static class Client {
+      final HostAndPort server;
+      final TabletClientService.Client service;
+
+      private Client(HostAndPort server, TabletClientService.Client service) {
+        this.server = server;
+        this.service = service;
+      }
+    }
+
     private void sendQueued(int threshhold) {
       if (queuedDataSize > threshhold || threshhold == 0) {
-        loadQueue.forEach((server, tabletFiles) -> {
+        var sendTimer = Timer.startNew();
+
+        List<Client> clients = new ArrayList<>();
+        try {
+
+          // Send load messages to tablet servers spinning up work, but do not 
wait on results.
+          loadQueue.forEach((server, tabletFiles) -> {
+
+            if (log.isTraceEnabled()) {
+              log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
+                  tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
+            }
+
+            // On the server side tablets are processed serially with a write 
to the metadata table
+            // done for each tablet. Chunk the work up for a tablet server up 
so that it can be sent
+            // over multiple connections allowing it to run in parallel on the 
server side. This
+            // allows multiple threads on a single tserver to do metadata 
writes for this bulk
+            // import.
+            int neededConnections = Math.min(maxConnections, 
tabletFiles.size());
+            List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
+                new ArrayList<>(neededConnections);
+            for (int i = 0; i < neededConnections; i++) {
+              chunks.add(new HashMap<>());
+            }
+
+            int nextConnection = 0;
+            for (var entry : tabletFiles.entrySet()) {
+              chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(), 
entry.getValue());
+            }
+
+            for (var chunk : chunks) {
+              try {
+                var client = 
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
+                    manager.getContext(), timeInMillis);
+                // add client to list before calling send in case there is an 
exception, this makes
+                // sure its returned in the finally
+                clients.add(new Client(server, client));
+                client.send_loadFilesV2(TraceUtil.traceInfo(), 
manager.getContext().rpcCreds(), tid,
+                    bulkDir.toString(), chunk, setTime);
+              } catch (TException ex) {
+                log.debug("rpc failed server: {}, {}", server, fmtTid, ex);

Review Comment:
   ```suggestion
                   log.debug("rpc send failed server: {}, {}", server, fmtTid, 
ex);
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -266,10 +356,12 @@ long finish() {
       sendQueued(0);
 
       long sleepTime = 0;
-      if (loadMsgs.size() > 0) {
-        // find which tablet server had the most load messages sent to it and 
sleep 13ms for each
-        // load message
-        sleepTime = loadMsgs.max() * 13;
+      if (tabletsAdded > 0) {
+        // Waited for all the tablet servers to process everything so a long 
sleep is not needed.
+        // Even though this code waited, it does not know what succeeded on 
the tablet server side
+        // and it did not track if there were connection errors. Since success 
status is unknown
+        // must return a non-zero sleep to indicate another scan of the 
metadata table is needed.
+        sleepTime = 1;

Review Comment:
   Removing that long sleep is a nice improvement.



##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -157,62 +160,149 @@ void start(Path bulkDir, Manager manager, long tid, 
boolean setTime) throws Exce
 
   private static class OnlineLoader extends Loader {
 
+    private final int maxConnections;
     long timeInMillis;
     String fmtTid;
     int locationLess = 0;
 
-    // track how many tablets were sent load messages per tablet server
-    MapCounter<HostAndPort> loadMsgs;
+    int tabletsAdded;
 
     // Each RPC to a tablet server needs to check in zookeeper to see if the 
transaction is still
     // active. The purpose of this map is to group load request by tablet 
servers inorder to do less
     // RPCs. Less RPCs will result in less calls to Zookeeper.
     Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
     private int queuedDataSize = 0;
 
+    public OnlineLoader(AccumuloConfiguration configuration) {
+      super();
+      this.maxConnections = 
configuration.getCount(Property.MANAGER_BULK_MAX_CONNECTIONS);
+    }
+
     @Override
     void start(Path bulkDir, Manager manager, long tid, boolean setTime) 
throws Exception {
       super.start(bulkDir, manager, tid, setTime);
 
       timeInMillis = 
manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
       fmtTid = FateTxId.formatTid(tid);
 
-      loadMsgs = new MapCounter<>();
+      tabletsAdded = 0;
 
       loadQueue = new HashMap<>();
     }
 
+    private static class Client {
+      final HostAndPort server;
+      final TabletClientService.Client service;
+
+      private Client(HostAndPort server, TabletClientService.Client service) {
+        this.server = server;
+        this.service = service;
+      }
+    }
+
     private void sendQueued(int threshhold) {
       if (queuedDataSize > threshhold || threshhold == 0) {
-        loadQueue.forEach((server, tabletFiles) -> {
+        var sendTimer = Timer.startNew();
+
+        List<Client> clients = new ArrayList<>();
+        try {
+
+          // Send load messages to tablet servers spinning up work, but do not 
wait on results.
+          loadQueue.forEach((server, tabletFiles) -> {
+
+            if (log.isTraceEnabled()) {
+              log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
+                  tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
+            }
+
+            // On the server side tablets are processed serially with a write 
to the metadata table
+            // done for each tablet. Chunk the work up for a tablet server up 
so that it can be sent
+            // over multiple connections allowing it to run in parallel on the 
server side. This
+            // allows multiple threads on a single tserver to do metadata 
writes for this bulk
+            // import.
+            int neededConnections = Math.min(maxConnections, 
tabletFiles.size());
+            List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
+                new ArrayList<>(neededConnections);
+            for (int i = 0; i < neededConnections; i++) {
+              chunks.add(new HashMap<>());
+            }
+
+            int nextConnection = 0;
+            for (var entry : tabletFiles.entrySet()) {
+              chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(), 
entry.getValue());
+            }
+
+            for (var chunk : chunks) {
+              try {
+                var client = 
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
+                    manager.getContext(), timeInMillis);
+                // add client to list before calling send in case there is an 
exception, this makes
+                // sure its returned in the finally
+                clients.add(new Client(server, client));
+                client.send_loadFilesV2(TraceUtil.traceInfo(), 
manager.getContext().rpcCreds(), tid,
+                    bulkDir.toString(), chunk, setTime);
+              } catch (TException ex) {
+                log.debug("rpc failed server: {}, {}", server, fmtTid, ex);
+              }
+            }
+          });
+
+          long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
+          sendTimer.restart();
+
+          int outdatedTservers = 0;
+
+          // wait for all the tservers to complete processing
+          for (var client : clients) {
+            try {
+              client.service.recv_loadFilesV2();
+            } catch (TException ex) {
+              String additionalInfo = "";
+              if (ex instanceof TApplicationException) {
+                if (((TApplicationException) ex).getType()
+                    == TApplicationException.UNKNOWN_METHOD) {
+                  // A new RPC method was added in 2.1.4, a tserver running 
2.1.3 or earlier will
+                  // not have this RPC. This should not kill the fate 
operation, it can wait until
+                  // all tablet servers are upgraded.
+                  outdatedTservers++;
+                  additionalInfo = " (tserver may be running older version)";
+                }
+              }
+              log.debug("rpc failed server{}: {}, {}", additionalInfo, 
client.server, fmtTid, ex);

Review Comment:
   ```suggestion
                 log.debug("rpc recv failed server{}: {}, {}", additionalInfo, 
client.server, fmtTid, ex);
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -157,62 +160,149 @@ void start(Path bulkDir, Manager manager, long tid, 
boolean setTime) throws Exce
 
   private static class OnlineLoader extends Loader {
 
+    private final int maxConnections;
     long timeInMillis;
     String fmtTid;
     int locationLess = 0;
 
-    // track how many tablets were sent load messages per tablet server
-    MapCounter<HostAndPort> loadMsgs;
+    int tabletsAdded;
 
     // Each RPC to a tablet server needs to check in zookeeper to see if the 
transaction is still
     // active. The purpose of this map is to group load request by tablet 
servers inorder to do less
     // RPCs. Less RPCs will result in less calls to Zookeeper.
     Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
     private int queuedDataSize = 0;
 
+    public OnlineLoader(AccumuloConfiguration configuration) {
+      super();
+      this.maxConnections = 
configuration.getCount(Property.MANAGER_BULK_MAX_CONNECTIONS);
+    }
+
     @Override
     void start(Path bulkDir, Manager manager, long tid, boolean setTime) 
throws Exception {
       super.start(bulkDir, manager, tid, setTime);
 
       timeInMillis = 
manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
       fmtTid = FateTxId.formatTid(tid);
 
-      loadMsgs = new MapCounter<>();
+      tabletsAdded = 0;
 
       loadQueue = new HashMap<>();
     }
 
+    private static class Client {
+      final HostAndPort server;
+      final TabletClientService.Client service;
+
+      private Client(HostAndPort server, TabletClientService.Client service) {
+        this.server = server;
+        this.service = service;
+      }
+    }
+
     private void sendQueued(int threshhold) {
       if (queuedDataSize > threshhold || threshhold == 0) {
-        loadQueue.forEach((server, tabletFiles) -> {
+        var sendTimer = Timer.startNew();
+
+        List<Client> clients = new ArrayList<>();
+        try {
+
+          // Send load messages to tablet servers spinning up work, but do not 
wait on results.
+          loadQueue.forEach((server, tabletFiles) -> {
+
+            if (log.isTraceEnabled()) {
+              log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
+                  tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
+            }
+
+            // On the server side tablets are processed serially with a write 
to the metadata table
+            // done for each tablet. Chunk the work up for a tablet server up 
so that it can be sent
+            // over multiple connections allowing it to run in parallel on the 
server side. This
+            // allows multiple threads on a single tserver to do metadata 
writes for this bulk
+            // import.
+            int neededConnections = Math.min(maxConnections, 
tabletFiles.size());
+            List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
+                new ArrayList<>(neededConnections);
+            for (int i = 0; i < neededConnections; i++) {
+              chunks.add(new HashMap<>());
+            }
+
+            int nextConnection = 0;
+            for (var entry : tabletFiles.entrySet()) {
+              chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(), 
entry.getValue());
+            }
+
+            for (var chunk : chunks) {
+              try {
+                var client = 
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
+                    manager.getContext(), timeInMillis);
+                // add client to list before calling send in case there is an 
exception, this makes
+                // sure its returned in the finally
+                clients.add(new Client(server, client));
+                client.send_loadFilesV2(TraceUtil.traceInfo(), 
manager.getContext().rpcCreds(), tid,
+                    bulkDir.toString(), chunk, setTime);
+              } catch (TException ex) {
+                log.debug("rpc failed server: {}, {}", server, fmtTid, ex);
+              }
+            }
+          });
+
+          long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
+          sendTimer.restart();
+
+          int outdatedTservers = 0;
+
+          // wait for all the tservers to complete processing
+          for (var client : clients) {
+            try {
+              client.service.recv_loadFilesV2();
+            } catch (TException ex) {
+              String additionalInfo = "";
+              if (ex instanceof TApplicationException) {
+                if (((TApplicationException) ex).getType()
+                    == TApplicationException.UNKNOWN_METHOD) {
+                  // A new RPC method was added in 2.1.4, a tserver running 
2.1.3 or earlier will
+                  // not have this RPC. This should not kill the fate 
operation, it can wait until
+                  // all tablet servers are upgraded.
+                  outdatedTservers++;
+                  additionalInfo = " (tserver may be running older version)";
+                }
+              }
+              log.debug("rpc failed server{}: {}, {}", additionalInfo, 
client.server, fmtTid, ex);
+            }
+          }
 
-          if (log.isTraceEnabled()) {
-            log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
-                tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
+          if (outdatedTservers > 0) {
+            log.info(

Review Comment:
   Should this be a warn level message since the bulk import will never make 
progress?



##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -157,62 +160,149 @@ void start(Path bulkDir, Manager manager, long tid, 
boolean setTime) throws Exce
 
   private static class OnlineLoader extends Loader {
 
+    private final int maxConnections;
     long timeInMillis;
     String fmtTid;
     int locationLess = 0;
 
-    // track how many tablets were sent load messages per tablet server
-    MapCounter<HostAndPort> loadMsgs;
+    int tabletsAdded;
 
     // Each RPC to a tablet server needs to check in zookeeper to see if the 
transaction is still
     // active. The purpose of this map is to group load request by tablet 
servers inorder to do less
     // RPCs. Less RPCs will result in less calls to Zookeeper.
     Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
     private int queuedDataSize = 0;
 
+    public OnlineLoader(AccumuloConfiguration configuration) {
+      super();
+      this.maxConnections = 
configuration.getCount(Property.MANAGER_BULK_MAX_CONNECTIONS);
+    }
+
     @Override
     void start(Path bulkDir, Manager manager, long tid, boolean setTime) 
throws Exception {
       super.start(bulkDir, manager, tid, setTime);
 
       timeInMillis = 
manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
       fmtTid = FateTxId.formatTid(tid);
 
-      loadMsgs = new MapCounter<>();
+      tabletsAdded = 0;
 
       loadQueue = new HashMap<>();
     }
 
+    private static class Client {
+      final HostAndPort server;
+      final TabletClientService.Client service;
+
+      private Client(HostAndPort server, TabletClientService.Client service) {
+        this.server = server;
+        this.service = service;
+      }
+    }
+
     private void sendQueued(int threshhold) {
       if (queuedDataSize > threshhold || threshhold == 0) {
-        loadQueue.forEach((server, tabletFiles) -> {
+        var sendTimer = Timer.startNew();
+
+        List<Client> clients = new ArrayList<>();
+        try {
+
+          // Send load messages to tablet servers spinning up work, but do not 
wait on results.
+          loadQueue.forEach((server, tabletFiles) -> {
+
+            if (log.isTraceEnabled()) {
+              log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
+                  tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
+            }
+
+            // On the server side tablets are processed serially with a write 
to the metadata table
+            // done for each tablet. Chunk the work up for a tablet server up 
so that it can be sent
+            // over multiple connections allowing it to run in parallel on the 
server side. This
+            // allows multiple threads on a single tserver to do metadata 
writes for this bulk
+            // import.

Review Comment:
   ```suggestion
               // Tablet servers process tablets serially and perform a single 
metadata table write for each tablet. 
               // Break the work into per-tablet chunks so it can be sent over 
multiple connections 
               // to the tserver, allowing each chunk to be run in parallel on 
the server side.
               // This allows multiple threads on a single tserver to do 
metadata writes for this bulk
               // import.
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java:
##########
@@ -157,62 +160,149 @@ void start(Path bulkDir, Manager manager, long tid, 
boolean setTime) throws Exce
 
   private static class OnlineLoader extends Loader {
 
+    private final int maxConnections;
     long timeInMillis;
     String fmtTid;
     int locationLess = 0;
 
-    // track how many tablets were sent load messages per tablet server
-    MapCounter<HostAndPort> loadMsgs;
+    int tabletsAdded;
 
     // Each RPC to a tablet server needs to check in zookeeper to see if the 
transaction is still
     // active. The purpose of this map is to group load request by tablet 
servers inorder to do less
     // RPCs. Less RPCs will result in less calls to Zookeeper.
     Map<HostAndPort,Map<TKeyExtent,Map<String,MapFileInfo>>> loadQueue;
     private int queuedDataSize = 0;
 
+    public OnlineLoader(AccumuloConfiguration configuration) {
+      super();
+      this.maxConnections = 
configuration.getCount(Property.MANAGER_BULK_MAX_CONNECTIONS);
+    }
+
     @Override
     void start(Path bulkDir, Manager manager, long tid, boolean setTime) 
throws Exception {
       super.start(bulkDir, manager, tid, setTime);
 
       timeInMillis = 
manager.getConfiguration().getTimeInMillis(Property.MANAGER_BULK_TIMEOUT);
       fmtTid = FateTxId.formatTid(tid);
 
-      loadMsgs = new MapCounter<>();
+      tabletsAdded = 0;
 
       loadQueue = new HashMap<>();
     }
 
+    private static class Client {
+      final HostAndPort server;
+      final TabletClientService.Client service;
+
+      private Client(HostAndPort server, TabletClientService.Client service) {
+        this.server = server;
+        this.service = service;
+      }
+    }
+
     private void sendQueued(int threshhold) {
       if (queuedDataSize > threshhold || threshhold == 0) {
-        loadQueue.forEach((server, tabletFiles) -> {
+        var sendTimer = Timer.startNew();
+
+        List<Client> clients = new ArrayList<>();
+        try {
+
+          // Send load messages to tablet servers spinning up work, but do not 
wait on results.
+          loadQueue.forEach((server, tabletFiles) -> {
+
+            if (log.isTraceEnabled()) {
+              log.trace("{} asking {} to bulk import {} files for {} tablets", 
fmtTid, server,
+                  tabletFiles.values().stream().mapToInt(Map::size).sum(), 
tabletFiles.size());
+            }
+
+            // On the server side tablets are processed serially with a write 
to the metadata table
+            // done for each tablet. Chunk the work up for a tablet server up 
so that it can be sent
+            // over multiple connections allowing it to run in parallel on the 
server side. This
+            // allows multiple threads on a single tserver to do metadata 
writes for this bulk
+            // import.
+            int neededConnections = Math.min(maxConnections, 
tabletFiles.size());
+            List<Map<TKeyExtent,Map<String,MapFileInfo>>> chunks =
+                new ArrayList<>(neededConnections);
+            for (int i = 0; i < neededConnections; i++) {
+              chunks.add(new HashMap<>());
+            }
+
+            int nextConnection = 0;
+            for (var entry : tabletFiles.entrySet()) {
+              chunks.get(nextConnection++ % chunks.size()).put(entry.getKey(), 
entry.getValue());
+            }
+
+            for (var chunk : chunks) {
+              try {
+                var client = 
ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, server,
+                    manager.getContext(), timeInMillis);
+                // add client to list before calling send in case there is an 
exception, this makes
+                // sure its returned in the finally
+                clients.add(new Client(server, client));
+                client.send_loadFilesV2(TraceUtil.traceInfo(), 
manager.getContext().rpcCreds(), tid,
+                    bulkDir.toString(), chunk, setTime);
+              } catch (TException ex) {
+                log.debug("rpc failed server: {}, {}", server, fmtTid, ex);
+              }
+            }
+          });
+
+          long sendTime = sendTimer.elapsed(TimeUnit.MILLISECONDS);
+          sendTimer.restart();
+
+          int outdatedTservers = 0;
+
+          // wait for all the tservers to complete processing
+          for (var client : clients) {
+            try {
+              client.service.recv_loadFilesV2();
+            } catch (TException ex) {
+              String additionalInfo = "";
+              if (ex instanceof TApplicationException) {
+                if (((TApplicationException) ex).getType()
+                    == TApplicationException.UNKNOWN_METHOD) {

Review Comment:
   Is this type check and cast able to be done in a single if statement?
   
   ```suggestion
                 if (ex instanceof TApplicationException && 
((TApplicationException) ex).getType()
                      == TApplicationException.UNKNOWN_METHOD) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to