ctubbsii commented on a change in pull request #1584: Moved ThriftClientHandler
out of TabletServer for #1581
URL: https://github.com/apache/accumulo/pull/1584#discussion_r406940540
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -495,2084 +386,218 @@ private static long jitter() {
private final SessionManager sessionManager;
- private final WriteTracker writeTracker = new WriteTracker();
-
- private final RowLocks rowLocks = new RowLocks();
-
private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
private final ReentrantLock recoveryLock = new ReentrantLock(true);
private ThriftClientHandler clientHandler;
private final ServerBulkImportStatus bulkImportStatus = new
ServerBulkImportStatus();
- private class ThriftClientHandler extends ClientServiceHandler
- implements TabletClientService.Iface {
-
- ThriftClientHandler() {
- super(getContext(), watcher, fs);
- log.debug("{} created", ThriftClientHandler.class.getName());
- }
-
- @Override
- public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials,
final long tid,
- final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean
setTime)
- throws ThriftSecurityException {
-
- if (!security.canPerformSystemActions(credentials)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- try {
- return watcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
- List<TKeyExtent> failures = new ArrayList<>();
-
- for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry :
files.entrySet()) {
- TKeyExtent tke = entry.getKey();
- Map<String,MapFileInfo> fileMap = entry.getValue();
- Map<TabletFile,MapFileInfo> fileRefMap = new HashMap<>();
- for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
- Path path = new Path(mapping.getKey());
- FileSystem ns = fs.getFileSystemByPath(path);
- path = ns.makeQualified(path);
- fileRefMap.put(new TabletFile(path), mapping.getValue());
- }
-
- Tablet importTablet = getOnlineTablet(new KeyExtent(tke));
-
- if (importTablet == null) {
- failures.add(tke);
- } else {
- try {
- importTablet.importMapFiles(tid, fileRefMap, setTime);
- } catch (IOException ioe) {
- log.info("files {} not imported to {}: {}", fileMap.keySet(),
new KeyExtent(tke),
- ioe.getMessage());
- failures.add(tke);
- }
- }
- }
- return failures;
- });
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void loadFiles(TInfo tinfo, TCredentials credentials, long tid,
String dir,
- Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
- throws ThriftSecurityException {
- if (!security.canPerformSystemActions(credentials)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- watcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
- tabletImports.forEach((tke, fileMap) -> {
- Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>();
- for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
- Path path = new Path(dir, mapping.getKey());
- FileSystem ns = fs.getFileSystemByPath(path);
- path = ns.makeQualified(path);
- newFileMap.put(new TabletFile(path), mapping.getValue());
- }
-
- Tablet importTablet = getOnlineTablet(new KeyExtent(tke));
-
- if (importTablet != null) {
- try {
- importTablet.importMapFiles(tid, newFileMap, setTime);
- } catch (IOException ioe) {
- log.debug("files {} not imported to {}: {}", fileMap.keySet(),
new KeyExtent(tke),
- ioe.getMessage());
- }
- }
- });
- });
-
- }
-
- private ScanDispatcher getScanDispatcher(KeyExtent extent) {
- if (extent.isRootTablet() || extent.isMeta()) {
- // dispatcher is only for user tables
- return null;
- }
-
- return
getContext().getTableConfiguration(extent.getTableId()).getScanDispatcher();
- }
-
- @Override
- public InitialScan startScan(TInfo tinfo, TCredentials credentials,
TKeyExtent textent,
- TRange range, List<TColumn> columns, int batchSize, List<IterInfo>
ssiList,
- Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites,
- boolean isolated, long readaheadThreshold, TSamplerConfiguration
tSamplerConfig,
- long batchTimeOut, String contextArg, Map<String,String>
executionHints)
- throws NotServingTabletException, ThriftSecurityException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
- TSampleNotPresentException {
-
- TableId tableId = TableId.of(new String(textent.getTable(), UTF_8));
- NamespaceId namespaceId;
- try {
- namespaceId = Tables.getNamespaceId(getContext(), tableId);
- } catch (TableNotFoundException e1) {
- throw new NotServingTabletException(textent);
- }
- if (!security.canScan(credentials, tableId, namespaceId, range, columns,
ssiList, ssio,
- authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- if (!security.authenticatedUserHasAuthorizations(credentials,
authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.BAD_AUTHORIZATIONS);
- }
-
- final KeyExtent extent = new KeyExtent(textent);
-
- // wait for any writes that are in flight.. this done to ensure
- // consistency across client restarts... assume a client writes
- // to accumulo and dies while waiting for a confirmation from
- // accumulo... the client process restarts and tries to read
- // data from accumulo making the assumption that it will get
- // any writes previously made... however if the server side thread
- // processing the write from the dead client is still in progress,
- // the restarted client may not see the write unless we wait here.
- // this behavior is very important when the client is reading the
- // metadata
- if (waitForWrites) {
- writeTracker.waitForWrites(TabletType.type(extent));
- }
-
- Tablet tablet = getOnlineTablet(extent);
- if (tablet == null) {
- throw new NotServingTabletException(textent);
- }
-
- HashSet<Column> columnSet = new HashSet<>();
- for (TColumn tcolumn : columns) {
- columnSet.add(new Column(tcolumn));
- }
+ protected TransactionWatcher getWatcher() {
+ return watcher;
+ }
Review comment:
There's actually another room for improvement (not in this PR... but
subsequently): the ClientServiceHandler (parent class of ThriftClientHandler)
only needs one parameter, not three: the server context. But, that's for a
subsequent change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services