keith-turner commented on a change in pull request #1584: Moved
ThriftClientHandler out of TabletServer for #1581
URL: https://github.com/apache/accumulo/pull/1584#discussion_r408129032
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -493,1649 +374,19 @@ private static long jitter() {
return (long) ((1. + (r.nextDouble() / 10)) *
TabletServer.TIME_BETWEEN_LOCATOR_CACHE_CLEARS);
}
- private final SessionManager sessionManager;
+ final SessionManager sessionManager;
- private final WriteTracker writeTracker = new WriteTracker();
-
- private final RowLocks rowLocks = new RowLocks();
-
- private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
- private final ReentrantLock recoveryLock = new ReentrantLock(true);
- private ThriftClientHandler clientHandler;
- private final ServerBulkImportStatus bulkImportStatus = new
ServerBulkImportStatus();
-
- private class ThriftClientHandler extends ClientServiceHandler
- implements TabletClientService.Iface {
-
- ThriftClientHandler() {
- super(getContext(), watcher, fs);
- log.debug("{} created", ThriftClientHandler.class.getName());
- }
-
- @Override
- public List<TKeyExtent> bulkImport(TInfo tinfo, TCredentials credentials,
final long tid,
- final Map<TKeyExtent,Map<String,MapFileInfo>> files, final boolean
setTime)
- throws ThriftSecurityException {
-
- if (!security.canPerformSystemActions(credentials)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- try {
- return watcher.run(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
- List<TKeyExtent> failures = new ArrayList<>();
-
- for (Entry<TKeyExtent,Map<String,MapFileInfo>> entry :
files.entrySet()) {
- TKeyExtent tke = entry.getKey();
- Map<String,MapFileInfo> fileMap = entry.getValue();
- Map<TabletFile,MapFileInfo> fileRefMap = new HashMap<>();
- for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
- Path path = new Path(mapping.getKey());
- FileSystem ns = fs.getFileSystemByPath(path);
- path = ns.makeQualified(path);
- fileRefMap.put(new TabletFile(path), mapping.getValue());
- }
-
- Tablet importTablet = getOnlineTablet(new KeyExtent(tke));
-
- if (importTablet == null) {
- failures.add(tke);
- } else {
- try {
- importTablet.importMapFiles(tid, fileRefMap, setTime);
- } catch (IOException ioe) {
- log.info("files {} not imported to {}: {}", fileMap.keySet(),
new KeyExtent(tke),
- ioe.getMessage());
- failures.add(tke);
- }
- }
- }
- return failures;
- });
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void loadFiles(TInfo tinfo, TCredentials credentials, long tid,
String dir,
- Map<TKeyExtent,Map<String,MapFileInfo>> tabletImports, boolean setTime)
- throws ThriftSecurityException {
- if (!security.canPerformSystemActions(credentials)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- watcher.runQuietly(Constants.BULK_ARBITRATOR_TYPE, tid, () -> {
- tabletImports.forEach((tke, fileMap) -> {
- Map<TabletFile,MapFileInfo> newFileMap = new HashMap<>();
- for (Entry<String,MapFileInfo> mapping : fileMap.entrySet()) {
- Path path = new Path(dir, mapping.getKey());
- FileSystem ns = fs.getFileSystemByPath(path);
- path = ns.makeQualified(path);
- newFileMap.put(new TabletFile(path), mapping.getValue());
- }
-
- Tablet importTablet = getOnlineTablet(new KeyExtent(tke));
-
- if (importTablet != null) {
- try {
- importTablet.importMapFiles(tid, newFileMap, setTime);
- } catch (IOException ioe) {
- log.debug("files {} not imported to {}: {}", fileMap.keySet(),
new KeyExtent(tke),
- ioe.getMessage());
- }
- }
- });
- });
-
- }
-
- private ScanDispatcher getScanDispatcher(KeyExtent extent) {
- if (extent.isRootTablet() || extent.isMeta()) {
- // dispatcher is only for user tables
- return null;
- }
-
- return
getContext().getTableConfiguration(extent.getTableId()).getScanDispatcher();
- }
-
- @Override
- public InitialScan startScan(TInfo tinfo, TCredentials credentials,
TKeyExtent textent,
- TRange range, List<TColumn> columns, int batchSize, List<IterInfo>
ssiList,
- Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites,
- boolean isolated, long readaheadThreshold, TSamplerConfiguration
tSamplerConfig,
- long batchTimeOut, String contextArg, Map<String,String>
executionHints)
- throws NotServingTabletException, ThriftSecurityException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
- TSampleNotPresentException {
-
- TableId tableId = TableId.of(new String(textent.getTable(), UTF_8));
- NamespaceId namespaceId;
- try {
- namespaceId = Tables.getNamespaceId(getContext(), tableId);
- } catch (TableNotFoundException e1) {
- throw new NotServingTabletException(textent);
- }
- if (!security.canScan(credentials, tableId, namespaceId, range, columns,
ssiList, ssio,
- authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- if (!security.authenticatedUserHasAuthorizations(credentials,
authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.BAD_AUTHORIZATIONS);
- }
-
- final KeyExtent extent = new KeyExtent(textent);
-
- // wait for any writes that are in flight.. this done to ensure
- // consistency across client restarts... assume a client writes
- // to accumulo and dies while waiting for a confirmation from
- // accumulo... the client process restarts and tries to read
- // data from accumulo making the assumption that it will get
- // any writes previously made... however if the server side thread
- // processing the write from the dead client is still in progress,
- // the restarted client may not see the write unless we wait here.
- // this behavior is very important when the client is reading the
- // metadata
- if (waitForWrites) {
- writeTracker.waitForWrites(TabletType.type(extent));
- }
-
- Tablet tablet = getOnlineTablet(extent);
- if (tablet == null) {
- throw new NotServingTabletException(textent);
- }
-
- HashSet<Column> columnSet = new HashSet<>();
- for (TColumn tcolumn : columns) {
- columnSet.add(new Column(tcolumn));
- }
-
- ScanParameters scanParams = new ScanParameters(batchSize, new
Authorizations(authorizations),
- columnSet, ssiList, ssio, isolated,
SamplerConfigurationImpl.fromThrift(tSamplerConfig),
- batchTimeOut, contextArg);
-
- final SingleScanSession scanSession = new SingleScanSession(credentials,
extent, scanParams,
- readaheadThreshold, executionHints);
- scanSession.scanner =
- tablet.createScanner(new Range(range), scanParams,
scanSession.interruptFlag);
-
- long sid = sessionManager.createSession(scanSession, true);
-
- ScanResult scanResult;
- try {
- scanResult = continueScan(tinfo, sid, scanSession);
- } catch (NoSuchScanIDException e) {
- log.error("The impossible happened", e);
- throw new RuntimeException();
- } finally {
- sessionManager.unreserveSession(sid);
- }
-
- return new InitialScan(sid, scanResult);
- }
-
- @Override
- public ScanResult continueScan(TInfo tinfo, long scanID)
- throws NoSuchScanIDException, NotServingTabletException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
- TSampleNotPresentException {
- SingleScanSession scanSession = (SingleScanSession)
sessionManager.reserveSession(scanID);
- if (scanSession == null) {
- throw new NoSuchScanIDException();
- }
-
- try {
- return continueScan(tinfo, scanID, scanSession);
- } finally {
- sessionManager.unreserveSession(scanSession);
- }
- }
-
- private ScanResult continueScan(TInfo tinfo, long scanID,
SingleScanSession scanSession)
- throws NoSuchScanIDException, NotServingTabletException,
- org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
- TSampleNotPresentException {
-
- if (scanSession.nextBatchTask == null) {
- scanSession.nextBatchTask =
- new NextBatchTask(TabletServer.this, scanID,
scanSession.interruptFlag);
- resourceManager.executeReadAhead(scanSession.extent,
getScanDispatcher(scanSession.extent),
- scanSession, scanSession.nextBatchTask);
- }
-
- ScanBatch bresult;
- try {
- bresult =
scanSession.nextBatchTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
- TimeUnit.MILLISECONDS);
- scanSession.nextBatchTask = null;
- } catch (ExecutionException e) {
- sessionManager.removeSession(scanID);
- if (e.getCause() instanceof NotServingTabletException) {
- throw (NotServingTabletException) e.getCause();
- } else if (e.getCause() instanceof TooManyFilesException) {
- throw new
org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException(
- scanSession.extent.toThrift());
- } else if (e.getCause() instanceof SampleNotPresentException) {
- throw new TSampleNotPresentException(scanSession.extent.toThrift());
- } else if (e.getCause() instanceof IOException) {
- sleepUninterruptibly(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
TimeUnit.MILLISECONDS);
- List<KVEntry> empty = Collections.emptyList();
- bresult = new ScanBatch(empty, true);
- scanSession.nextBatchTask = null;
- } else {
- throw new RuntimeException(e);
- }
- } catch (CancellationException ce) {
- sessionManager.removeSession(scanID);
- Tablet tablet = getOnlineTablet(scanSession.extent);
- if (tablet == null || tablet.isClosed()) {
- throw new NotServingTabletException(scanSession.extent.toThrift());
- } else {
- throw new NoSuchScanIDException();
- }
- } catch (TimeoutException e) {
- List<TKeyValue> param = Collections.emptyList();
- long timeout =
-
TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
- sessionManager.removeIfNotAccessed(scanID, timeout);
- return new ScanResult(param, true);
- } catch (Throwable t) {
- sessionManager.removeSession(scanID);
- log.warn("Failed to get next batch", t);
- throw new RuntimeException(t);
- }
-
- ScanResult scanResult = new
ScanResult(Key.compress(bresult.getResults()), bresult.isMore());
-
- scanSession.entriesReturned += scanResult.results.size();
-
- scanSession.batchCount++;
-
- if (scanResult.more && scanSession.batchCount >
scanSession.readaheadThreshold) {
- // start reading next batch while current batch is transmitted
- // to client
- scanSession.nextBatchTask =
- new NextBatchTask(TabletServer.this, scanID,
scanSession.interruptFlag);
- resourceManager.executeReadAhead(scanSession.extent,
getScanDispatcher(scanSession.extent),
- scanSession, scanSession.nextBatchTask);
- }
-
- if (!scanResult.more) {
- closeScan(tinfo, scanID);
- }
-
- return scanResult;
- }
-
- @Override
- public void closeScan(TInfo tinfo, long scanID) {
- final SingleScanSession ss = (SingleScanSession)
sessionManager.removeSession(scanID);
- if (ss != null) {
- long t2 = System.currentTimeMillis();
-
- if (log.isTraceEnabled()) {
- log.trace(String.format("ScanSess tid %s %s %,d entries in %.2f
secs, nbTimes = [%s] ",
- TServerUtils.clientAddress.get(), ss.extent.getTableId(),
ss.entriesReturned,
- (t2 - ss.startTime) / 1000.0, ss.runStats.toString()));
- }
-
- scanMetrics.addScan(t2 - ss.startTime);
- scanMetrics.addResult(ss.entriesReturned);
- }
- }
-
- @Override
- public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials
credentials,
- Map<TKeyExtent,List<TRange>> tbatch, List<TColumn> tcolumns,
List<IterInfo> ssiList,
- Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations,
boolean waitForWrites,
- TSamplerConfiguration tSamplerConfig, long batchTimeOut, String
contextArg,
- Map<String,String> executionHints)
- throws ThriftSecurityException, TSampleNotPresentException {
- // find all of the tables that need to be scanned
- final HashSet<TableId> tables = new HashSet<>();
- for (TKeyExtent keyExtent : tbatch.keySet()) {
- tables.add(TableId.of(new String(keyExtent.getTable(), UTF_8)));
- }
-
- if (tables.size() != 1) {
- throw new IllegalArgumentException("Cannot batch scan over multiple
tables");
- }
-
- // check if user has permission to the tables
- for (TableId tableId : tables) {
- NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canScan(credentials, tableId, namespaceId, tbatch,
tcolumns, ssiList, ssio,
- authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
- }
-
- try {
- if (!security.authenticatedUserHasAuthorizations(credentials,
authorizations)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.BAD_AUTHORIZATIONS);
- }
- } catch (ThriftSecurityException tse) {
- log.error("{} is not authorized", credentials.getPrincipal(), tse);
- throw tse;
- }
- Map<KeyExtent,List<Range>> batch = Translator.translate(tbatch, new
TKeyExtentTranslator(),
- new Translator.ListTranslator<>(new TRangeTranslator()));
-
- // This is used to determine which thread pool to use
- KeyExtent threadPoolExtent = batch.keySet().iterator().next();
-
- if (waitForWrites) {
- writeTracker.waitForWrites(TabletType.type(batch.keySet()));
- }
-
- Set<Column> columnSet = tcolumns.isEmpty() ? Collections.emptySet()
- : new HashSet<>(Collections2.transform(tcolumns, Column::new));
-
- ScanParameters scanParams =
- new ScanParameters(-1, new Authorizations(authorizations),
columnSet, ssiList, ssio,
- false, SamplerConfigurationImpl.fromThrift(tSamplerConfig),
batchTimeOut, contextArg);
-
- final MultiScanSession mss =
- new MultiScanSession(credentials, threadPoolExtent, batch,
scanParams, executionHints);
-
- mss.numTablets = batch.size();
- for (List<Range> ranges : batch.values()) {
- mss.numRanges += ranges.size();
- }
-
- long sid = sessionManager.createSession(mss, true);
-
- MultiScanResult result;
- try {
- result = continueMultiScan(sid, mss);
- } finally {
- sessionManager.unreserveSession(sid);
- }
-
- return new InitialMultiScan(sid, result);
- }
-
- @Override
- public MultiScanResult continueMultiScan(TInfo tinfo, long scanID)
- throws NoSuchScanIDException, TSampleNotPresentException {
-
- MultiScanSession session = (MultiScanSession)
sessionManager.reserveSession(scanID);
-
- if (session == null) {
- throw new NoSuchScanIDException();
- }
-
- try {
- return continueMultiScan(scanID, session);
- } finally {
- sessionManager.unreserveSession(session);
- }
- }
-
- private MultiScanResult continueMultiScan(long scanID, MultiScanSession
session)
- throws TSampleNotPresentException {
-
- if (session.lookupTask == null) {
- session.lookupTask = new LookupTask(TabletServer.this, scanID);
- resourceManager.executeReadAhead(session.threadPoolExtent,
- getScanDispatcher(session.threadPoolExtent), session,
session.lookupTask);
- }
-
- try {
- MultiScanResult scanResult =
- session.lookupTask.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
TimeUnit.MILLISECONDS);
- session.lookupTask = null;
- return scanResult;
- } catch (ExecutionException e) {
- sessionManager.removeSession(scanID);
- if (e.getCause() instanceof SampleNotPresentException) {
- throw new TSampleNotPresentException();
- } else {
- log.warn("Failed to get multiscan result", e);
- throw new RuntimeException(e);
- }
- } catch (TimeoutException e1) {
- long timeout =
-
TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
- sessionManager.removeIfNotAccessed(scanID, timeout);
- List<TKeyValue> results = Collections.emptyList();
- Map<TKeyExtent,List<TRange>> failures = Collections.emptyMap();
- List<TKeyExtent> fullScans = Collections.emptyList();
- return new MultiScanResult(results, failures, fullScans, null, null,
false, true);
- } catch (Throwable t) {
- sessionManager.removeSession(scanID);
- log.warn("Failed to get multiscan result", t);
- throw new RuntimeException(t);
- }
- }
-
- @Override
- public void closeMultiScan(TInfo tinfo, long scanID) throws
NoSuchScanIDException {
- MultiScanSession session = (MultiScanSession)
sessionManager.removeSession(scanID);
- if (session == null) {
- throw new NoSuchScanIDException();
- }
-
- long t2 = System.currentTimeMillis();
-
- if (log.isTraceEnabled()) {
- log.trace(String.format(
- "MultiScanSess %s %,d entries in %.2f secs"
- + " (lookup_time:%.2f secs tablets:%,d ranges:%,d) ",
- TServerUtils.clientAddress.get(), session.numEntries, (t2 -
session.startTime) / 1000.0,
- session.totalLookupTime / 1000.0, session.numTablets,
session.numRanges));
- }
- }
-
- @Override
- public long startUpdate(TInfo tinfo, TCredentials credentials, TDurability
tdurabilty)
- throws ThriftSecurityException {
- // Make sure user is real
- Durability durability = DurabilityImpl.fromThrift(tdurabilty);
- security.authenticateUser(credentials, credentials);
- updateMetrics.addPermissionErrors(0);
-
- UpdateSession us = new UpdateSession(
- new TservConstraintEnv(getContext(), security, credentials),
credentials, durability);
- return sessionManager.createSession(us, false);
- }
-
- private void setUpdateTablet(UpdateSession us, KeyExtent keyExtent) {
- long t1 = System.currentTimeMillis();
- if (us.currentTablet != null &&
us.currentTablet.getExtent().equals(keyExtent)) {
- return;
- }
- if (us.currentTablet == null
- && (us.failures.containsKey(keyExtent) ||
us.authFailures.containsKey(keyExtent))) {
- // if there were previous failures, then do not accept additional
writes
- return;
- }
-
- TableId tableId = null;
- try {
- // if user has no permission to write to this table, add it to
- // the failures list
- boolean sameTable = us.currentTablet != null
- &&
(us.currentTablet.getExtent().getTableId().equals(keyExtent.getTableId()));
- tableId = keyExtent.getTableId();
- if (sameTable || security.canWrite(us.getCredentials(), tableId,
- Tables.getNamespaceId(getContext(), tableId))) {
- long t2 = System.currentTimeMillis();
- us.authTimes.addStat(t2 - t1);
- us.currentTablet = getOnlineTablet(keyExtent);
- if (us.currentTablet != null) {
- us.queuedMutations.put(us.currentTablet, new ArrayList<>());
- } else {
- // not serving tablet, so report all mutations as
- // failures
- us.failures.put(keyExtent, 0L);
- updateMetrics.addUnknownTabletErrors(0);
- }
- } else {
- log.warn("Denying access to table {} for user {}",
keyExtent.getTableId(), us.getUser());
- long t2 = System.currentTimeMillis();
- us.authTimes.addStat(t2 - t1);
- us.currentTablet = null;
- us.authFailures.put(keyExtent, SecurityErrorCode.PERMISSION_DENIED);
- updateMetrics.addPermissionErrors(0);
- return;
- }
- } catch (TableNotFoundException tnfe) {
- log.error("Table " + tableId + " not found ", tnfe);
- long t2 = System.currentTimeMillis();
- us.authTimes.addStat(t2 - t1);
- us.currentTablet = null;
- us.authFailures.put(keyExtent, SecurityErrorCode.TABLE_DOESNT_EXIST);
- updateMetrics.addUnknownTabletErrors(0);
- return;
- } catch (ThriftSecurityException e) {
- log.error("Denying permission to check user " + us.getUser() + " with
user " + e.getUser(),
- e);
- long t2 = System.currentTimeMillis();
- us.authTimes.addStat(t2 - t1);
- us.currentTablet = null;
- us.authFailures.put(keyExtent, e.getCode());
- updateMetrics.addPermissionErrors(0);
- return;
- }
- }
-
- @Override
- public void applyUpdates(TInfo tinfo, long updateID, TKeyExtent tkeyExtent,
- List<TMutation> tmutations) {
- UpdateSession us = (UpdateSession)
sessionManager.reserveSession(updateID);
- if (us == null) {
- return;
- }
-
- boolean reserved = true;
- try {
- KeyExtent keyExtent = new KeyExtent(tkeyExtent);
- setUpdateTablet(us, keyExtent);
-
- if (us.currentTablet != null) {
- long additionalMutationSize = 0;
- List<Mutation> mutations = us.queuedMutations.get(us.currentTablet);
- for (TMutation tmutation : tmutations) {
- Mutation mutation = new ServerMutation(tmutation);
- mutations.add(mutation);
- additionalMutationSize += mutation.numBytes();
- }
- us.queuedMutationSize += additionalMutationSize;
- long totalQueued =
updateTotalQueuedMutationSize(additionalMutationSize);
- long total = TabletServer.this.getConfiguration()
- .getAsBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
- if (totalQueued > total) {
- try {
- flush(us);
- } catch (HoldTimeoutException hte) {
- // Assumption is that the client has timed out and is gone. If
that's not the case,
- // then removing the session should cause the client to fail
- // in such a way that it retries.
- log.debug("HoldTimeoutException during applyUpdates, removing
session");
- sessionManager.removeSession(updateID, true);
- reserved = false;
- }
- }
- }
- } finally {
- if (reserved) {
- sessionManager.unreserveSession(us);
- }
- }
- }
-
- private void flush(UpdateSession us) {
-
- int mutationCount = 0;
- Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
- Map<CommitSession,TabletMutations> loggables = new HashMap<>();
- Throwable error = null;
-
- long pt1 = System.currentTimeMillis();
-
- boolean containsMetadataTablet = false;
- for (Tablet tablet : us.queuedMutations.keySet()) {
- if (tablet.getExtent().isMeta()) {
- containsMetadataTablet = true;
- }
- }
-
- if (!containsMetadataTablet && us.queuedMutations.size() > 0) {
- TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
- }
-
- try (TraceScope prep = Trace.startSpan("prep")) {
- for (Entry<Tablet,? extends List<Mutation>> entry :
us.queuedMutations.entrySet()) {
-
- Tablet tablet = entry.getKey();
- Durability durability =
- DurabilityImpl.resolveDurabilty(us.durability,
tablet.getDurability());
- List<Mutation> mutations = entry.getValue();
- if (mutations.size() > 0) {
- try {
- updateMetrics.addMutationArraySize(mutations.size());
-
- PreparedMutations prepared =
tablet.prepareMutationsForCommit(us.cenv, mutations);
-
- if (prepared.tabletClosed()) {
- if (us.currentTablet == tablet) {
- us.currentTablet = null;
- }
- us.failures.put(tablet.getExtent(),
us.successfulCommits.get(tablet));
- } else {
- if (!prepared.getNonViolators().isEmpty()) {
- List<Mutation> validMutations = prepared.getNonViolators();
- CommitSession session = prepared.getCommitSession();
- if (durability != Durability.NONE) {
- loggables.put(session,
- new TabletMutations(session, validMutations,
durability));
- }
- sendables.put(session, validMutations);
- }
-
- if (!prepared.getViolations().isEmpty()) {
- us.violations.add(prepared.getViolations());
- updateMetrics.addConstraintViolations(0);
- }
- // Use the size of the original mutation list, regardless of
how many mutations
- // did not violate constraints.
- mutationCount += mutations.size();
-
- }
- } catch (Throwable t) {
- error = t;
- log.error("Unexpected error preparing for commit", error);
- break;
- }
- }
- }
- }
-
- long pt2 = System.currentTimeMillis();
- us.prepareTimes.addStat(pt2 - pt1);
- updateAvgPrepTime(pt2 - pt1, us.queuedMutations.size());
-
- if (error != null) {
- sendables.forEach((commitSession, value) ->
commitSession.abortCommit());
- throw new RuntimeException(error);
- }
- try {
- try (TraceScope wal = Trace.startSpan("wal")) {
- while (true) {
- try {
- long t1 = System.currentTimeMillis();
-
- logger.logManyTablets(loggables);
-
- long t2 = System.currentTimeMillis();
- us.walogTimes.addStat(t2 - t1);
- updateWalogWriteTime((t2 - t1));
- break;
- } catch (IOException | FSError ex) {
- log.warn("logging mutations failed, retrying");
- } catch (Throwable t) {
- log.error("Unknown exception logging mutations, counts"
- + " for mutations in flight not decremented!", t);
- throw new RuntimeException(t);
- }
- }
- }
-
- try (TraceScope commit = Trace.startSpan("commit")) {
- long t1 = System.currentTimeMillis();
- sendables.forEach((commitSession, mutations) -> {
- commitSession.commit(mutations);
- KeyExtent extent = commitSession.getExtent();
-
- if (us.currentTablet != null && extent ==
us.currentTablet.getExtent()) {
- // because constraint violations may filter out some
- // mutations, for proper accounting with the client code,
- // need to increment the count based on the original
- // number of mutations from the client NOT the filtered number
- us.successfulCommits.increment(us.currentTablet,
- us.queuedMutations.get(us.currentTablet).size());
- }
- });
- long t2 = System.currentTimeMillis();
-
- us.flushTime += (t2 - pt1);
- us.commitTimes.addStat(t2 - t1);
-
- updateAvgCommitTime(t2 - t1, sendables.size());
- }
- } finally {
- us.queuedMutations.clear();
- if (us.currentTablet != null) {
- us.queuedMutations.put(us.currentTablet, new ArrayList<>());
- }
- updateTotalQueuedMutationSize(-us.queuedMutationSize);
- us.queuedMutationSize = 0;
- }
- us.totalUpdates += mutationCount;
- }
-
- private void updateWalogWriteTime(long time) {
- updateMetrics.addWalogWriteTime(time);
- }
-
- private void updateAvgCommitTime(long time, int size) {
- if (size > 0)
- updateMetrics.addCommitTime((long) (time / (double) size));
- }
-
- private void updateAvgPrepTime(long time, int size) {
- if (size > 0)
- updateMetrics.addCommitPrep((long) (time / (double) size));
- }
-
- @Override
- public UpdateErrors closeUpdate(TInfo tinfo, long updateID) throws
NoSuchScanIDException {
- final UpdateSession us = (UpdateSession)
sessionManager.removeSession(updateID);
- if (us == null) {
- throw new NoSuchScanIDException();
- }
-
- // clients may or may not see data from an update session while
- // it is in progress, however when the update session is closed
- // want to ensure that reads wait for the write to finish
- long opid = writeTracker.startWrite(us.queuedMutations.keySet());
-
- try {
- flush(us);
- } catch (HoldTimeoutException e) {
- // Assumption is that the client has timed out and is gone. If that's
not the case throw an
- // exception that will cause it to retry.
- log.debug("HoldTimeoutException during closeUpdate, reporting no such
session");
- throw new NoSuchScanIDException();
- } finally {
- writeTracker.finishWrite(opid);
- }
-
- if (log.isTraceEnabled()) {
- log.trace(
- String.format("UpSess %s %,d in %.3fs, at=[%s] ft=%.3fs(pt=%.3fs
lt=%.3fs ct=%.3fs)",
- TServerUtils.clientAddress.get(), us.totalUpdates,
- (System.currentTimeMillis() - us.startTime) / 1000.0,
us.authTimes.toString(),
- us.flushTime / 1000.0, us.prepareTimes.sum() / 1000.0,
us.walogTimes.sum() / 1000.0,
- us.commitTimes.sum() / 1000.0));
- }
- if (us.failures.size() > 0) {
- Entry<KeyExtent,Long> first = us.failures.entrySet().iterator().next();
- log.debug(String.format("Failures: %d, first extent %s successful
commits: %d",
- us.failures.size(), first.getKey().toString(), first.getValue()));
- }
- List<ConstraintViolationSummary> violations = us.violations.asList();
- if (violations.size() > 0) {
- ConstraintViolationSummary first =
us.violations.asList().iterator().next();
- log.debug(String.format("Violations: %d, first %s occurs %d",
violations.size(),
- first.violationDescription, first.numberOfViolatingMutations));
- }
- if (us.authFailures.size() > 0) {
- KeyExtent first = us.authFailures.keySet().iterator().next();
- log.debug(String.format("Authentication Failures: %d, first %s",
us.authFailures.size(),
- first.toString()));
- }
-
- return new UpdateErrors(Translator.translate(us.failures,
Translators.KET),
- Translator.translate(violations, Translators.CVST),
- Translator.translate(us.authFailures, Translators.KET));
- }
-
- @Override
- public void update(TInfo tinfo, TCredentials credentials, TKeyExtent
tkeyExtent,
- TMutation tmutation, TDurability tdurability)
- throws NotServingTabletException, ConstraintViolationException,
ThriftSecurityException {
-
- final TableId tableId = TableId.of(new String(tkeyExtent.getTable(),
UTF_8));
- NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canWrite(credentials, tableId, namespaceId)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
- final KeyExtent keyExtent = new KeyExtent(tkeyExtent);
- final Tablet tablet = getOnlineTablet(new KeyExtent(keyExtent));
- if (tablet == null) {
- throw new NotServingTabletException(tkeyExtent);
- }
- Durability tabletDurability = tablet.getDurability();
-
- if (!keyExtent.isMeta()) {
- try {
- TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
- } catch (HoldTimeoutException hte) {
- // Major hack. Assumption is that the client has timed out and is
gone. If that's not the
- // case, then throwing the following will let client know there
- // was a failure and it should retry.
- throw new NotServingTabletException(tkeyExtent);
- }
- }
-
- final long opid = writeTracker.startWrite(TabletType.type(keyExtent));
-
- try {
- final Mutation mutation = new ServerMutation(tmutation);
- final List<Mutation> mutations = Collections.singletonList(mutation);
-
- PreparedMutations prepared;
- try (TraceScope prep = Trace.startSpan("prep")) {
- prepared = tablet.prepareMutationsForCommit(
- new TservConstraintEnv(getContext(), security, credentials),
mutations);
- }
-
- if (prepared.tabletClosed()) {
- throw new NotServingTabletException(tkeyExtent);
- } else if (!prepared.getViolators().isEmpty()) {
- throw new ConstraintViolationException(
- Translator.translate(prepared.getViolations().asList(),
Translators.CVST));
- } else {
- CommitSession session = prepared.getCommitSession();
- Durability durability = DurabilityImpl
- .resolveDurabilty(DurabilityImpl.fromThrift(tdurability),
tabletDurability);
-
- // Instead of always looping on true, skip completely when
durability is NONE.
- while (durability != Durability.NONE) {
- try {
- try (TraceScope wal = Trace.startSpan("wal")) {
- logger.log(session, mutation, durability);
- }
- break;
- } catch (IOException ex) {
- log.warn("Error writing mutations to log", ex);
- }
- }
-
- try (TraceScope commit = Trace.startSpan("commit")) {
- session.commit(mutations);
- }
- }
- } finally {
- writeTracker.finishWrite(opid);
- }
- }
-
- private NamespaceId getNamespaceId(TCredentials credentials, TableId
tableId)
- throws ThriftSecurityException {
- try {
- return Tables.getNamespaceId(getContext(), tableId);
- } catch (TableNotFoundException e1) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.TABLE_DOESNT_EXIST);
- }
- }
-
- private void
checkConditions(Map<KeyExtent,List<ServerConditionalMutation>> updates,
- ArrayList<TCMResult> results, ConditionalSession cs, List<String>
symbols)
- throws IOException {
- Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter =
- updates.entrySet().iterator();
-
- final CompressedIterators compressedIters = new
CompressedIterators(symbols);
- ConditionCheckerContext checkerContext = new
ConditionCheckerContext(getContext(),
- compressedIters, getContext().getTableConfiguration(cs.tableId));
-
- while (iter.hasNext()) {
- final Entry<KeyExtent,List<ServerConditionalMutation>> entry =
iter.next();
- final Tablet tablet = getOnlineTablet(entry.getKey());
-
- if (tablet == null || tablet.isClosed()) {
- for (ServerConditionalMutation scm : entry.getValue()) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- }
- iter.remove();
- } else {
- final List<ServerConditionalMutation> okMutations =
- new ArrayList<>(entry.getValue().size());
- final List<TCMResult> resultsSubList =
results.subList(results.size(), results.size());
-
- ConditionChecker checker =
- checkerContext.newChecker(entry.getValue(), okMutations,
resultsSubList);
- try {
- tablet.checkConditions(checker, cs.auths, cs.interruptFlag);
-
- if (okMutations.size() > 0) {
- entry.setValue(okMutations);
- } else {
- iter.remove();
- }
- } catch (TabletClosedException | IterationInterruptedException
- | TooManyFilesException e) {
- // clear anything added while checking conditions.
- resultsSubList.clear();
-
- for (ServerConditionalMutation scm : entry.getValue()) {
- results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED));
- }
- iter.remove();
- }
- }
- }
- }
-
- private void
writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>>
updates,
- ArrayList<TCMResult> results, ConditionalSession sess) {
- Set<Entry<KeyExtent,List<ServerConditionalMutation>>> es =
updates.entrySet();
-
- Map<CommitSession,List<Mutation>> sendables = new HashMap<>();
- Map<CommitSession,TabletMutations> loggables = new HashMap<>();
-
- boolean sessionCanceled = sess.interruptFlag.get();
-
- try (TraceScope prepSpan = Trace.startSpan("prep")) {
- long t1 = System.currentTimeMillis();
- for (Entry<KeyExtent,List<ServerConditionalMutation>> entry : es) {
- final Tablet tablet = getOnlineTablet(entry.getKey());
- if (tablet == null || tablet.isClosed() || sessionCanceled) {
- addMutationsAsTCMResults(results, entry.getValue(),
TCMStatus.IGNORED);
- } else {
- final Durability durability =
- DurabilityImpl.resolveDurabilty(sess.durability,
tablet.getDurability());
-
- @SuppressWarnings("unchecked")
- List<Mutation> mutations = (List<Mutation>) (List<? extends
Mutation>) entry.getValue();
- if (!mutations.isEmpty()) {
-
- PreparedMutations prepared = tablet.prepareMutationsForCommit(
- new TservConstraintEnv(getContext(), security,
sess.credentials), mutations);
-
- if (prepared.tabletClosed()) {
- addMutationsAsTCMResults(results, mutations,
TCMStatus.IGNORED);
- } else {
- if (!prepared.getNonViolators().isEmpty()) {
- // Only log and commit mutations that did not violate
constraints.
- List<Mutation> validMutations = prepared.getNonViolators();
- addMutationsAsTCMResults(results, validMutations,
TCMStatus.ACCEPTED);
- CommitSession session = prepared.getCommitSession();
- if (durability != Durability.NONE) {
- loggables.put(session,
- new TabletMutations(session, validMutations,
durability));
- }
- sendables.put(session, validMutations);
- }
-
- if (!prepared.getViolators().isEmpty()) {
- addMutationsAsTCMResults(results, prepared.getViolators(),
TCMStatus.VIOLATED);
- }
- }
- }
- }
- }
-
- long t2 = System.currentTimeMillis();
- updateAvgPrepTime(t2 - t1, es.size());
- }
-
- try (TraceScope walSpan = Trace.startSpan("wal")) {
- while (loggables.size() > 0) {
- try {
- long t1 = System.currentTimeMillis();
- logger.logManyTablets(loggables);
- long t2 = System.currentTimeMillis();
- updateWalogWriteTime(t2 - t1);
- break;
- } catch (IOException | FSError ex) {
- log.warn("logging mutations failed, retrying");
- } catch (Throwable t) {
- log.error("Unknown exception logging mutations, counts for"
- + " mutations in flight not decremented!", t);
- throw new RuntimeException(t);
- }
- }
- }
-
- try (TraceScope commitSpan = Trace.startSpan("commit")) {
- long t1 = System.currentTimeMillis();
- sendables.forEach(CommitSession::commit);
- long t2 = System.currentTimeMillis();
- updateAvgCommitTime(t2 - t1, sendables.size());
- }
- }
-
- /**
- * Transform and add each mutation as a {@link TCMResult} with the
mutation's ID and the
- * specified status to the {@link TCMResult} list.
- */
- private void addMutationsAsTCMResults(final List<TCMResult> list,
- final Collection<? extends Mutation> mutations, final TCMStatus
status) {
- mutations.stream()
- .map(mutation -> new TCMResult(((ServerConditionalMutation)
mutation).getID(), status))
- .forEach(list::add);
- }
-
- private Map<KeyExtent,List<ServerConditionalMutation>>
conditionalUpdate(ConditionalSession cs,
- Map<KeyExtent,List<ServerConditionalMutation>> updates,
ArrayList<TCMResult> results,
- List<String> symbols) throws IOException {
- // sort each list of mutations, this is done to avoid deadlock and doing
seeks in order is
- // more efficient and detect duplicate rows.
- ConditionalMutationSet.sortConditionalMutations(updates);
-
- Map<KeyExtent,List<ServerConditionalMutation>> deferred = new
HashMap<>();
-
- // can not process two mutations for the same row, because one will not
see what the other
- // writes
- ConditionalMutationSet.deferDuplicatesRows(updates, deferred);
-
- // get as many locks as possible w/o blocking... defer any rows that are
locked
- List<RowLock> locks = rowLocks.acquireRowlocks(updates, deferred);
- try {
- try (TraceScope checkSpan = Trace.startSpan("Check conditions")) {
- checkConditions(updates, results, cs, symbols);
- }
-
- try (TraceScope updateSpan = Trace.startSpan("apply conditional
mutations")) {
- writeConditionalMutations(updates, results, cs);
- }
- } finally {
- rowLocks.releaseRowLocks(locks);
- }
- return deferred;
- }
-
- @Override
- public TConditionalSession startConditionalUpdate(TInfo tinfo,
TCredentials credentials,
- List<ByteBuffer> authorizations, String tableIdStr, TDurability
tdurabilty,
- String classLoaderContext) throws ThriftSecurityException, TException {
-
- TableId tableId = TableId.of(tableIdStr);
- Authorizations userauths = null;
- NamespaceId namespaceId = getNamespaceId(credentials, tableId);
- if (!security.canConditionallyUpdate(credentials, tableId, namespaceId))
{
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- userauths = security.getUserAuthorizations(credentials);
- for (ByteBuffer auth : authorizations) {
- if (!userauths.contains(ByteBufferUtil.toBytes(auth))) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.BAD_AUTHORIZATIONS);
- }
- }
-
- ConditionalSession cs = new ConditionalSession(credentials,
- new Authorizations(authorizations), tableId,
DurabilityImpl.fromThrift(tdurabilty));
-
- long sid = sessionManager.createSession(cs, false);
- return new TConditionalSession(sid, lockID,
sessionManager.getMaxIdleTime());
- }
-
- @Override
- public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID,
- Map<TKeyExtent,List<TConditionalMutation>> mutations, List<String>
symbols)
- throws NoSuchScanIDException, TException {
-
- ConditionalSession cs = (ConditionalSession)
sessionManager.reserveSession(sessID);
-
- if (cs == null || cs.interruptFlag.get()) {
- throw new NoSuchScanIDException();
- }
-
- if (!cs.tableId.equals(MetadataTable.ID) &&
!cs.tableId.equals(RootTable.ID)) {
- try {
- TabletServer.this.resourceManager.waitUntilCommitsAreEnabled();
- } catch (HoldTimeoutException hte) {
- // Assumption is that the client has timed out and is gone. If
that's not the case throw
- // an exception that will cause it to retry.
- log.debug("HoldTimeoutException during conditionalUpdate, reporting
no such session");
- throw new NoSuchScanIDException();
- }
- }
-
- TableId tid = cs.tableId;
- long opid = writeTracker.startWrite(TabletType.type(new KeyExtent(tid,
null, null)));
-
- try {
- Map<KeyExtent,List<ServerConditionalMutation>> updates =
Translator.translate(mutations,
- Translators.TKET, new
Translator.ListTranslator<>(ServerConditionalMutation.TCMT));
-
- for (KeyExtent ke : updates.keySet()) {
- if (!ke.getTableId().equals(tid)) {
- throw new IllegalArgumentException(
- "Unexpected table id " + tid + " != " + ke.getTableId());
- }
- }
-
- ArrayList<TCMResult> results = new ArrayList<>();
-
- Map<KeyExtent,List<ServerConditionalMutation>> deferred =
- conditionalUpdate(cs, updates, results, symbols);
-
- while (deferred.size() > 0) {
- deferred = conditionalUpdate(cs, deferred, results, symbols);
- }
-
- return results;
- } catch (IOException ioe) {
- throw new TException(ioe);
- } finally {
- writeTracker.finishWrite(opid);
- sessionManager.unreserveSession(sessID);
- }
- }
-
- @Override
- public void invalidateConditionalUpdate(TInfo tinfo, long sessID) {
- // this method should wait for any running conditional update to complete
- // after this method returns a conditional update should not be able to
start
-
- ConditionalSession cs = (ConditionalSession)
sessionManager.getSession(sessID);
- if (cs != null) {
- cs.interruptFlag.set(true);
- }
-
- cs = (ConditionalSession) sessionManager.reserveSession(sessID, true);
- if (cs != null) {
- sessionManager.removeSession(sessID, true);
- }
- }
-
- @Override
- public void closeConditionalUpdate(TInfo tinfo, long sessID) {
- sessionManager.removeSession(sessID, false);
- }
-
- @Override
- public void splitTablet(TInfo tinfo, TCredentials credentials, TKeyExtent
tkeyExtent,
- ByteBuffer splitPoint) throws NotServingTabletException,
ThriftSecurityException {
-
- TableId tableId = TableId.of(new
String(ByteBufferUtil.toBytes(tkeyExtent.table)));
- NamespaceId namespaceId = getNamespaceId(credentials, tableId);
-
- if (!security.canSplitTablet(credentials, tableId, namespaceId)) {
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
-
- KeyExtent keyExtent = new KeyExtent(tkeyExtent);
-
- Tablet tablet = getOnlineTablet(keyExtent);
- if (tablet == null) {
- throw new NotServingTabletException(tkeyExtent);
- }
-
- if (keyExtent.getEndRow() == null
- || !keyExtent.getEndRow().equals(ByteBufferUtil.toText(splitPoint)))
{
- try {
- if (TabletServer.this.splitTablet(tablet,
ByteBufferUtil.toBytes(splitPoint)) == null) {
- throw new NotServingTabletException(tkeyExtent);
- }
- } catch (IOException e) {
- log.warn("Failed to split " + keyExtent, e);
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public TabletServerStatus getTabletServerStatus(TInfo tinfo, TCredentials
credentials) {
- return getStats(sessionManager.getActiveScansPerTable());
- }
-
- @Override
- public List<TabletStats> getTabletStats(TInfo tinfo, TCredentials
credentials, String tableId) {
- List<TabletStats> result = new ArrayList<>();
- TableId text = TableId.of(tableId);
- KeyExtent start = new KeyExtent(text, new Text(), null);
- for (Entry<KeyExtent,Tablet> entry :
getOnlineTablets().tailMap(start).entrySet()) {
- KeyExtent ke = entry.getKey();
- if (ke.getTableId().compareTo(text) == 0) {
- Tablet tablet = entry.getValue();
- TabletStats stats = tablet.getTabletStats();
- stats.extent = ke.toThrift();
- stats.ingestRate = tablet.ingestRate();
- stats.queryRate = tablet.queryRate();
- stats.splitCreationTime = tablet.getSplitCreationTime();
- stats.numEntries = tablet.getNumEntries();
- result.add(stats);
- }
- }
- return result;
- }
-
- private void checkPermission(TCredentials credentials, String lock, final
String request)
- throws ThriftSecurityException {
- try {
- log.trace("Got {} message from user: {}", request,
credentials.getPrincipal());
- if (!security.canPerformSystemActions(credentials)) {
- log.warn("Got {} message from user: {}", request,
credentials.getPrincipal());
- throw new ThriftSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED);
- }
- } catch (ThriftSecurityException e) {
- log.warn("Got {} message from unauthenticatable user: {}", request,
e.getUser());
- if (getContext().getCredentials().getToken().getClass().getName()
- .equals(credentials.getTokenClassName())) {
- log.error("Got message from a service with a mismatched
configuration."
- + " Please ensure a compatible configuration.", e);
- }
- throw e;
- }
-
- if (tabletServerLock == null || !tabletServerLock.wasLockAcquired()) {
- log.debug("Got {} message before my lock was acquired, ignoring...",
request);
- throw new RuntimeException("Lock not acquired");
- }
-
- if (tabletServerLock != null && tabletServerLock.wasLockAcquired()
- && !tabletServerLock.isLocked()) {
- Halt.halt(1, () -> {
- log.info("Tablet server no longer holds lock during
checkPermission() : {}, exiting",
- request);
- gcLogger.logGCInfo(TabletServer.this.getConfiguration());
- });
- }
-
- if (lock != null) {
- ZooUtil.LockID lid =
- new ZooUtil.LockID(getContext().getZooKeeperRoot() +
Constants.ZMASTER_LOCK, lock);
-
- try {
- if (!ZooLock.isLockHeld(masterLockCache, lid)) {
- // maybe the cache is out of date and a new master holds the
- // lock?
- masterLockCache.clear();
- if (!ZooLock.isLockHeld(masterLockCache, lid)) {
- log.warn("Got {} message from a master that does not hold the
current lock {}",
- request, lock);
- throw new RuntimeException("bad master lock");
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("bad master lock", e);
- }
- }
- }
-
- @Override
- public void loadTablet(TInfo tinfo, TCredentials credentials, String lock,
- final TKeyExtent textent) {
-
- try {
- checkPermission(credentials, lock, "loadTablet");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to load a tablet", e);
- throw new RuntimeException(e);
- }
-
- final KeyExtent extent = new KeyExtent(textent);
-
- synchronized (unopenedTablets) {
- synchronized (openingTablets) {
- synchronized (onlineTablets) {
-
- // checking if this exact tablet is in any of the sets
- // below is not a strong enough check
- // when splits and fix splits occurring
-
- Set<KeyExtent> unopenedOverlapping =
KeyExtent.findOverlapping(extent, unopenedTablets);
- Set<KeyExtent> openingOverlapping =
KeyExtent.findOverlapping(extent, openingTablets);
- Set<KeyExtent> onlineOverlapping =
- KeyExtent.findOverlapping(extent, onlineTablets.snapshot());
-
- Set<KeyExtent> all = new HashSet<>();
- all.addAll(unopenedOverlapping);
- all.addAll(openingOverlapping);
- all.addAll(onlineOverlapping);
-
- if (!all.isEmpty()) {
-
- // ignore any tablets that have recently split, for error logging
- for (KeyExtent e2 : onlineOverlapping) {
- Tablet tablet = getOnlineTablet(e2);
- if (System.currentTimeMillis() - tablet.getSplitCreationTime()
- < RECENTLY_SPLIT_MILLIES) {
- all.remove(e2);
- }
- }
-
- // ignore self, for error logging
- all.remove(extent);
-
- if (all.size() > 0) {
- log.error("Tablet {} overlaps previously assigned {} {} {}",
extent,
- unopenedOverlapping, openingOverlapping, onlineOverlapping
+ " " + all);
- }
- return;
- }
-
- unopenedTablets.add(extent);
- }
- }
- }
-
- TabletLogger.loading(extent, TabletServer.this.getTabletSession());
-
- final AssignmentHandler ah = new AssignmentHandler(extent);
- // final Runnable ah = new LoggingRunnable(log, );
- // Root tablet assignment must take place immediately
-
- if (extent.isRootTablet()) {
- new Daemon("Root Tablet Assignment") {
- @Override
- public void run() {
- ah.run();
- if (onlineTablets.snapshot().containsKey(extent)) {
- log.info("Root tablet loaded: {}", extent);
- } else {
- log.info("Root tablet failed to load");
- }
-
- }
- }.start();
- } else {
- if (extent.isMeta()) {
- resourceManager.addMetaDataAssignment(extent, log, ah);
- } else {
- resourceManager.addAssignment(extent, log, ah);
- }
- }
- }
-
- @Override
- public void unloadTablet(TInfo tinfo, TCredentials credentials, String
lock, TKeyExtent textent,
- TUnloadTabletGoal goal, long requestTime) {
- try {
- checkPermission(credentials, lock, "unloadTablet");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to unload a tablet", e);
- throw new RuntimeException(e);
- }
-
- KeyExtent extent = new KeyExtent(textent);
-
- resourceManager.addMigration(extent,
- new LoggingRunnable(log, new UnloadTabletHandler(extent, goal,
requestTime)));
- }
-
- @Override
- public void flush(TInfo tinfo, TCredentials credentials, String lock,
String tableId,
- ByteBuffer startRow, ByteBuffer endRow) {
- try {
- checkPermission(credentials, lock, "flush");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to flush a table", e);
- throw new RuntimeException(e);
- }
-
- ArrayList<Tablet> tabletsToFlush = new ArrayList<>();
-
- KeyExtent ke = new KeyExtent(TableId.of(tableId),
ByteBufferUtil.toText(endRow),
- ByteBufferUtil.toText(startRow));
-
- for (Tablet tablet : getOnlineTablets().values()) {
- if (ke.overlaps(tablet.getExtent())) {
- tabletsToFlush.add(tablet);
- }
- }
-
- Long flushID = null;
-
- for (Tablet tablet : tabletsToFlush) {
- if (flushID == null) {
- // read the flush id once from zookeeper instead of reading
- // it for each tablet
- try {
- flushID = tablet.getFlushID();
- } catch (NoNodeException e) {
- // table was probably deleted
- log.info("Asked to flush table that has no flush id {} {}", ke,
e.getMessage());
- return;
- }
- }
- tablet.flush(flushID);
- }
- }
-
- @Override
- public void flushTablet(TInfo tinfo, TCredentials credentials, String lock,
- TKeyExtent textent) {
- try {
- checkPermission(credentials, lock, "flushTablet");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to flush a tablet", e);
- throw new RuntimeException(e);
- }
-
- Tablet tablet = getOnlineTablet(new KeyExtent(textent));
- if (tablet != null) {
- log.info("Flushing {}", tablet.getExtent());
- try {
- tablet.flush(tablet.getFlushID());
- } catch (NoNodeException nne) {
- log.info("Asked to flush tablet that has no flush id {} {}", new
KeyExtent(textent),
- nne.getMessage());
- }
- }
- }
-
- @Override
- public void halt(TInfo tinfo, TCredentials credentials, String lock)
- throws ThriftSecurityException {
-
- checkPermission(credentials, lock, "halt");
-
- Halt.halt(0, () -> {
- log.info("Master requested tablet server halt");
- gcLogger.logGCInfo(TabletServer.this.getConfiguration());
- serverStopRequested = true;
- try {
- tabletServerLock.unlock();
- } catch (Exception e) {
- log.error("Caught exception unlocking TabletServer lock", e);
- }
- });
- }
-
- @Override
- public void fastHalt(TInfo info, TCredentials credentials, String lock) {
- try {
- halt(info, credentials, lock);
- } catch (Exception e) {
- log.warn("Error halting", e);
- }
- }
-
- @Override
- public TabletStats getHistoricalStats(TInfo tinfo, TCredentials
credentials) {
- return statsKeeper.getTabletStats();
- }
-
- @Override
- public List<ActiveScan> getActiveScans(TInfo tinfo, TCredentials
credentials)
- throws ThriftSecurityException, TException {
- try {
- checkPermission(credentials, null, "getScans");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to get active scans", e);
- throw e;
- }
-
- return sessionManager.getActiveScans();
- }
-
- @Override
- public void chop(TInfo tinfo, TCredentials credentials, String lock,
TKeyExtent textent) {
- try {
- checkPermission(credentials, lock, "chop");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to chop extent", e);
- throw new RuntimeException(e);
- }
-
- KeyExtent ke = new KeyExtent(textent);
-
- Tablet tablet = getOnlineTablet(ke);
- if (tablet != null) {
- tablet.chopFiles();
- }
- }
-
- @Override
- public void compact(TInfo tinfo, TCredentials credentials, String lock,
String tableId,
- ByteBuffer startRow, ByteBuffer endRow) {
- try {
- checkPermission(credentials, lock, "compact");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to compact a table", e);
- throw new RuntimeException(e);
- }
-
- KeyExtent ke = new KeyExtent(TableId.of(tableId),
ByteBufferUtil.toText(endRow),
- ByteBufferUtil.toText(startRow));
-
- ArrayList<Tablet> tabletsToCompact = new ArrayList<>();
-
- for (Tablet tablet : getOnlineTablets().values()) {
- if (ke.overlaps(tablet.getExtent())) {
- tabletsToCompact.add(tablet);
- }
- }
-
- Pair<Long,UserCompactionConfig> compactionInfo = null;
-
- for (Tablet tablet : tabletsToCompact) {
- // all for the same table id, so only need to read
- // compaction id once
- if (compactionInfo == null) {
- try {
- compactionInfo = tablet.getCompactionID();
- } catch (NoNodeException e) {
- log.info("Asked to compact table with no compaction id {} {}", ke,
e.getMessage());
- return;
- }
- }
- tablet.compactAll(compactionInfo.getFirst(),
compactionInfo.getSecond());
- }
-
- }
-
- @Override
- public List<ActiveCompaction> getActiveCompactions(TInfo tinfo,
TCredentials credentials)
- throws ThriftSecurityException, TException {
- try {
- checkPermission(credentials, null, "getActiveCompactions");
- } catch (ThriftSecurityException e) {
- log.error("Caller doesn't have permission to get active compactions",
e);
- throw e;
- }
-
- List<CompactionInfo> compactions = Compactor.getRunningCompactions();
- List<ActiveCompaction> ret = new ArrayList<>(compactions.size());
-
- for (CompactionInfo compactionInfo : compactions) {
- ret.add(compactionInfo.toThrift());
- }
-
- return ret;
- }
-
- @Override
- public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) {
- String log = logger.getLogFile();
- // Might be null if there no active logger
- if (log == null) {
- return Collections.emptyList();
- }
- return Collections.singletonList(log);
- }
-
- @Override
- public void removeLogs(TInfo tinfo, TCredentials credentials, List<String>
filenames) {
- log.warn("Garbage collector is attempting to remove logs through the
tablet server");
- log.warn("This is probably because your file"
- + " Garbage Collector is an older version than your tablet
servers.\n"
- + "Restart your file Garbage Collector.");
- }
-
- private TSummaries getSummaries(Future<SummaryCollection> future) throws
TimeoutException {
- try {
- SummaryCollection sc =
- future.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS,
TimeUnit.MILLISECONDS);
- return sc.toThrift();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- private TSummaries handleTimeout(long sessionId) {
- long timeout =
-
TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
- sessionManager.removeIfNotAccessed(sessionId, timeout);
- return new TSummaries(false, sessionId, -1, -1, null);
- }
-
- private TSummaries startSummaryOperation(TCredentials credentials,
- Future<SummaryCollection> future) {
- try {
- return getSummaries(future);
- } catch (TimeoutException e) {
- long sid = sessionManager.createSession(new
SummarySession(credentials, future), false);
- while (sid == 0) {
- sessionManager.removeSession(sid);
- sid = sessionManager.createSession(new SummarySession(credentials,
future), false);
- }
- return handleTimeout(sid);
- }
- }
-
- @Override
- public TSummaries startGetSummaries(TInfo tinfo, TCredentials credentials,
- TSummaryRequest request)
- throws ThriftSecurityException, ThriftTableOperationException,
TException {
- NamespaceId namespaceId;
- TableId tableId = TableId.of(request.getTableId());
- try {
- namespaceId = Tables.getNamespaceId(getContext(), tableId);
- } catch (TableNotFoundException e1) {
- throw new ThriftTableOperationException(tableId.canonical(), null,
null,
- TableOperationExceptionType.NOTFOUND, null);
- }
-
- if (!security.canGetSummaries(credentials, tableId, namespaceId)) {
- throw new AccumuloSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED).asThriftException();
- }
-
- ExecutorService es = resourceManager.getSummaryPartitionExecutor();
- Future<SummaryCollection> future = new Gatherer(getContext(), request,
- getContext().getTableConfiguration(tableId),
getContext().getCryptoService()).gather(es);
-
- return startSummaryOperation(credentials, future);
- }
-
- @Override
- public TSummaries startGetSummariesForPartition(TInfo tinfo, TCredentials
credentials,
- TSummaryRequest request, int modulus, int remainder)
- throws ThriftSecurityException, TException {
- // do not expect users to call this directly, expect other tservers to
call this method
- if (!security.canPerformSystemActions(credentials)) {
- throw new AccumuloSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED).asThriftException();
- }
-
- ExecutorService spe = resourceManager.getSummaryRemoteExecutor();
- TableConfiguration tableConfig =
- getContext().getTableConfiguration(TableId.of(request.getTableId()));
- Future<SummaryCollection> future =
- new Gatherer(getContext(), request, tableConfig,
getContext().getCryptoService())
- .processPartition(spe, modulus, remainder);
-
- return startSummaryOperation(credentials, future);
- }
-
- @Override
- public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials
credentials,
- TSummaryRequest request, Map<String,List<TRowRange>> files)
- throws ThriftSecurityException, TException {
- // do not expect users to call this directly, expect other tservers to
call this method
- if (!security.canPerformSystemActions(credentials)) {
- throw new AccumuloSecurityException(credentials.getPrincipal(),
- SecurityErrorCode.PERMISSION_DENIED).asThriftException();
- }
-
- ExecutorService srp = resourceManager.getSummaryRetrievalExecutor();
- TableConfiguration tableCfg =
- getContext().getTableConfiguration(TableId.of(request.getTableId()));
- BlockCache summaryCache = resourceManager.getSummaryCache();
- BlockCache indexCache = resourceManager.getIndexCache();
- Cache<String,Long> fileLenCache = resourceManager.getFileLenCache();
- FileSystemResolver volMgr = p -> fs.getFileSystemByPath(p);
- Future<SummaryCollection> future =
- new Gatherer(getContext(), request, tableCfg,
getContext().getCryptoService())
- .processFiles(volMgr, files, summaryCache, indexCache,
fileLenCache, srp);
-
- return startSummaryOperation(credentials, future);
- }
+ private final AtomicLong totalQueuedMutationSize = new AtomicLong(0);
+ private final ReentrantLock recoveryLock = new ReentrantLock(true);
+ private ThriftClientHandler clientHandler;
+ private final ServerBulkImportStatus bulkImportStatus = new
ServerBulkImportStatus();
- @Override
- public TSummaries contiuneGetSummaries(TInfo tinfo, long sessionId)
- throws NoSuchScanIDException, TException {
- SummarySession session = (SummarySession)
sessionManager.getSession(sessionId);
- if (session == null) {
- throw new NoSuchScanIDException();
- }
+ protected String getLockID() {
Review comment:
```suggestion
String getLockID() {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services