[accumulo-testing] branch main updated: Add deletes to continuous ingest (#166)
This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git The following commit(s) were added to refs/heads/main by this push: new 711d28e Add deletes to continuous ingest (#166) 711d28e is described below commit 711d28e83a91cbaa0754064a7d72c0f975f7de31 Author: Dom G <47725857+domgargu...@users.noreply.github.com> AuthorDate: Fri Nov 19 13:04:02 2021 -0500 Add deletes to continuous ingest (#166) * Add the ability for the deletion of entries to occur while running continuous ingest --- conf/accumulo-testing.properties | 3 + .../org/apache/accumulo/testing/TestProps.java | 2 + .../testing/continuous/ContinuousIngest.java | 85 -- 3 files changed, 69 insertions(+), 21 deletions(-) diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties index 4f0e749..6ef5855 100644 --- a/conf/accumulo-testing.properties +++ b/conf/accumulo-testing.properties @@ -82,6 +82,9 @@ test.ci.ingest.pause.wait.max=180 test.ci.ingest.pause.duration.min=60 # Maximum pause duration (in seconds) test.ci.ingest.pause.duration.max=120 +# The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest +# To disable deletes, set probability to 0 +test.ci.ingest.delete.probability=0.1 # Batch walker # diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java b/src/main/java/org/apache/accumulo/testing/TestProps.java index c4c8948..e7801d3 100644 --- a/src/main/java/org/apache/accumulo/testing/TestProps.java +++ b/src/main/java/org/apache/accumulo/testing/TestProps.java @@ -94,6 +94,8 @@ public class TestProps { public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST + "pause.duration.max"; // Amount of data to write before flushing. Pause checks are only done after flush. public static final String CI_INGEST_FLUSH_ENTRIES = CI_INGEST + "entries.flush"; + // The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest + public static final String CI_INGEST_DELETE_PROBABILITY = CI_INGEST + "delete.probability"; /** Batch Walker **/ // Sleep time between batch scans (in ms) diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index 66f5152..459102f 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -74,6 +74,14 @@ public class ContinuousIngest { return (rand.nextInt(max - min) + min); } + private static float getDeleteProbability(Properties props) { +String stringValue = props.getProperty(TestProps.CI_INGEST_DELETE_PROBABILITY); +float prob = Float.parseFloat(stringValue); +Preconditions.checkArgument(prob >= 0.0 && prob <= 1.0, +"Delete probability should be between 0.0 and 1.0"); +return prob; + } + private static int getFlushEntries(Properties props) { return Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, "100")); } @@ -128,11 +136,8 @@ public class ContinuousIngest { // always want to point back to flushed data. This way the previous item should // always exist in accumulo when verifying data. To do this make insert N point // back to the row from insert (N - flushInterval). The array below is used to keep - // track of this. - long[] prevRows = new long[flushInterval]; - long[] firstRows = new long[flushInterval]; - int[] firstColFams = new int[flushInterval]; - int[] firstColQuals = new int[flushInterval]; + // track of all inserts. + MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval]; long lastFlushTime = System.currentTimeMillis(); @@ -149,20 +154,21 @@ public class ContinuousIngest { log.info("INGESTING for " + pauseWaitSec + "s"); } + final float deleteProbability = getDeleteProbability(testProps); + log.info("DELETES will occur with a probability of {}", + String.format("%.02f", deleteProbability)); + out: while (true) { -// generate first set of nodes ColumnVisibility cv = getVisibility(r); +// generate first set of nodes for (int index = 0; index < flushInterval; index++) { long rowLong = genLong(rowMin, rowMax, r); - prevRows[index] = rowLong; - firstRows[index] = rowLong; int cf = r.nextInt(maxColF); int cq = r.nextInt(maxColQ); - firstColFams[index] = cf; - firstColQuals[index] = cq; + nodeMap[0][index] = new MutationInfo(rowLong, cf, cq); Mutation m = genMutation(rowLong, cf, cq, cv,
[accumulo] branch main updated: Create page in Monitor for external compactions (#2358)
This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new 2bce893 Create page in Monitor for external compactions (#2358) 2bce893 is described below commit 2bce8939145b49a765f998f5dbe2b6242a64e3e6 Author: Mike Miller AuthorDate: Fri Nov 19 10:42:48 2021 -0500 Create page in Monitor for external compactions (#2358) * Create multiple new classes for displaying 3 different tables of data in the new external compaction page in the monitor * Create 3 new ajax endpoints in ECResource * Modify Compactor and ExternalCompactionUtil to return Optional for the compaction coordinator instead of null * Add check for compaction coordinator to Monitor.fetchData() * New ExternalCompactionProgressIT for testing progress * Use new bootstrap panel and badges for coordinator info * Closes #2290 Co-authored-by: Dom G. <47725857+domgargu...@users.noreply.github.com> --- .../util/compaction/ExternalCompactionUtil.java| 10 +- .../org/apache/accumulo/compactor/Compactor.java | 8 +- .../java/org/apache/accumulo/monitor/Monitor.java | 89 + .../compactions/external/CompactionInputFile.java | 38 .../rest/compactions/external/CompactorInfo.java | 33 .../rest/compactions/external/Compactors.java | 40 .../rest/compactions/external/CoordinatorInfo.java | 41 + .../rest/compactions/external/ECResource.java | 62 +++ .../external/ExternalCompactionInfo.java | 60 ++ .../compactions/external/RunningCompactions.java | 39 .../compactions/external/RunningCompactorInfo.java | 133 ++ .../org/apache/accumulo/monitor/view/WebViews.java | 25 +++ .../org/apache/accumulo/monitor/resources/js/ec.js | 201 + .../org/apache/accumulo/monitor/templates/ec.ftl | 80 .../apache/accumulo/monitor/templates/navbar.ftl | 1 + .../compaction/ExternalCompactionProgressIT.java | 166 + .../compaction/ExternalCompactionTestUtils.java| 42 +++-- 17 files changed, 1044 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java index a5538b6..24dff44 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -93,19 +94,18 @@ public class ExternalCompactionUtil { /** * - * @return null if Coordinator node not found, else HostAndPort + * @return Optional HostAndPort of Coordinator node if found */ - public static HostAndPort findCompactionCoordinator(ClientContext context) { + public static Optional findCompactionCoordinator(ClientContext context) { final String lockPath = context.getZooKeeperRoot() + Constants.ZCOORDINATOR_LOCK; try { var zk = ZooSession.getAnonymousSession(context.getZooKeepers(), context.getZooKeepersSessionTimeOut()); byte[] address = ServiceLock.getLockData(zk, ServiceLock.path(lockPath)); if (null == address) { -return null; +return Optional.empty(); } - String coordinatorAddress = new String(address); - return HostAndPort.fromString(coordinatorAddress); + return Optional.of(HostAndPort.fromString(new String(address))); } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); } diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index 898177e..6565280 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -474,12 +474,12 @@ public class Compactor extends AbstractServer implements CompactorService.Iface * when unable to get client */ protected CompactionCoordinatorService.Client getCoordinatorClient() throws TTransportException { -HostAndPort coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext()); -if (null == coordinatorHost) { +var coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(getContext()); +if (coordinatorHost.isEmpty()) { throw new TTransportException("Unable to get CompactionCoordinator