http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java index f58db38,0000000..1fb56ef mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/RestartStressIT.java @@@ -1,153 -1,0 +1,152 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.cluster.ClusterControl; +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + - import com.google.common.base.Charsets; - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +public class RestartStressIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(RestartStressIT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + Map<String,String> opts = cfg.getSiteConfig(); + opts.put(Property.TSERV_MAXMEM.getKey(), "100K"); + opts.put(Property.TSERV_MAJC_DELAY.getKey(), "100ms"); + opts.put(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1M"); + opts.put(Property.INSTANCE_ZK_TIMEOUT.getKey(), "15s"); + opts.put(Property.MASTER_RECOVERY_DELAY.getKey(), "1s"); + cfg.setSiteConfig(opts); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Override + protected int defaultTimeoutSeconds() { + return 10 * 60; + } + + private ExecutorService svc; + + @Before + public void setup() throws Exception { + svc = Executors.newFixedThreadPool(1); + } + + @After + public void teardown() throws Exception { + if (null == svc) { + return; + } + + if (!svc.isShutdown()) { + svc.shutdown(); + } + + while (!svc.awaitTermination(10, TimeUnit.SECONDS)) { + log.info("Waiting for threadpool to terminate"); + } + } + + private static final VerifyIngest.Opts VOPTS; + static { + VOPTS = new VerifyIngest.Opts(); + VOPTS.rows = 10 * 1000; + } + private static final ScannerOpts SOPTS = new ScannerOpts(); + + @Test + public void test() throws Exception { + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + final AuthenticationToken token = getAdminToken(); + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "500K"); + final ClusterControl control = getCluster().getClusterControl(); + final String[] args; + if (token instanceof PasswordToken) { + byte[] password = ((PasswordToken) token).getPassword(); - args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, Charsets.UTF_8), "-i", cluster.getInstanceName(), "-z", - cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", tableName}; ++ args = new String[] {"-u", getAdminPrincipal(), "-p", new String(password, UTF_8), "-i", cluster.getInstanceName(), "-z", cluster.getZooKeepers(), ++ "--rows", "" + VOPTS.rows, "--table", tableName}; + } else if (token instanceof KerberosToken) { + ClusterUser rootUser = getAdminUser(); + args = new String[] {"-u", getAdminPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-i", cluster.getInstanceName(), "-z", + cluster.getZooKeepers(), "--rows", "" + VOPTS.rows, "--table", tableName}; + } else { + throw new RuntimeException("Unrecognized token"); + } + + Future<Integer> retCode = svc.submit(new Callable<Integer>() { + @Override + public Integer call() { + try { + return control.exec(TestIngest.class, args); + } catch (Exception e) { + log.error("Error running TestIngest", e); + return -1; + } + } + }); + + for (int i = 0; i < 2; i++) { + sleepUninterruptibly(10, TimeUnit.SECONDS); + control.stopAllServers(ServerType.TABLET_SERVER); + control.startAllServers(ServerType.TABLET_SERVER); + } + assertEquals(0, retCode.get().intValue()); + VOPTS.setTableName(tableName); + + if (token instanceof PasswordToken) { + VOPTS.setPrincipal(getAdminPrincipal()); + } else if (token instanceof KerberosToken) { + VOPTS.updateKerberosCredentials(cluster.getClientConfig()); + } else { + throw new RuntimeException("Unrecognized token"); + } + + VerifyIngest.verifyIngest(c, VOPTS, SOPTS); + } + +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java index 4f78b77,0000000..71956f4 mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ScanIdIT.java @@@ -1,390 -1,0 +1,390 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + - import static com.google.common.base.Charsets.UTF_8; ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.ActiveScan; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +/** + * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that {@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()} + * returns a unique scan id. + * + * <p> + * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator} to create multiple scan sessions. The test exercises multiple + * tablet servers with splits and multiple ranges to force the scans to occur across multiple tablet servers for completeness. + * + * <p> + * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless the following be added: + * + * <p> + * private static final long serialVersionUID = -4659975753252858243l; + * + * <p> + * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated. + */ +public class ScanIdIT extends AccumuloClusterHarness { + + private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class); + + private static final int NUM_SCANNERS = 8; + + private static final int NUM_DATA_ROWS = 100; + + private static final Random random = new Random(); + + private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS); + + private static final AtomicBoolean testInProgress = new AtomicBoolean(true); + + private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>(); + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + /** + * @throws Exception + * any exception is a test failure. + */ + @Test + public void testScanId() throws Exception { + + final String tableName = getUniqueNames(1)[0]; + Connector conn = getConnector(); + conn.tableOperations().create(tableName); + + addSplits(conn, tableName); + + log.info("Splits added"); + + generateSampleData(conn, tableName); + + log.info("Generated data for {}", tableName); + + attachSlowIterator(conn, tableName); + + CountDownLatch latch = new CountDownLatch(NUM_SCANNERS); + + for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) { + ScannerThread st = new ScannerThread(conn, scannerIndex, tableName, latch); + pool.submit(st); + } + + // wait for scanners to report a result. + while (testInProgress.get()) { + + if (resultsByWorker.size() < NUM_SCANNERS) { + log.trace("Results reported {}", resultsByWorker.size()); + sleepUninterruptibly(750, TimeUnit.MILLISECONDS); + } else { + // each worker has reported at least one result. + testInProgress.set(false); + + log.debug("Final result count {}", resultsByWorker.size()); + + // delay to allow scanners to react to end of test and cleanly close. + sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + } + + // all scanner have reported at least 1 result, so check for unique scan ids. + Set<Long> scanIds = new HashSet<Long>(); + + List<String> tservers = conn.instanceOperations().getTabletServers(); + + log.debug("tablet servers {}", tservers.toString()); + + for (String tserver : tservers) { + + List<ActiveScan> activeScans = null; + for (int i = 0; i < 10; i++) { + try { + activeScans = conn.instanceOperations().getActiveScans(tserver); + break; + } catch (AccumuloException e) { + if (e.getCause() instanceof TableNotFoundException) { + log.debug("Got TableNotFoundException, will retry"); + Thread.sleep(200); + continue; + } + throw e; + } + } + + assertNotNull("Repeatedly got exception trying to active scans", activeScans); + + log.debug("TServer {} has {} active scans", tserver, activeScans.size()); + + for (ActiveScan scan : activeScans) { + log.debug("Tserver {} scan id {}", tserver, scan.getScanid()); + scanIds.add(scan.getScanid()); + } + } + + assertTrue("Expected at least " + NUM_SCANNERS + " scanIds, but saw " + scanIds.size(), NUM_SCANNERS <= scanIds.size()); + + } + + /** + * Runs scanner in separate thread to allow multiple scanners to execute in parallel. + * <p/> + * The thread run method is terminated when the testInProgress flag is set to false. + */ + private static class ScannerThread implements Runnable { + + private final Connector connector; + private Scanner scanner = null; + private final int workerIndex; + private final String tablename; + private final CountDownLatch latch; + + public ScannerThread(final Connector connector, final int workerIndex, final String tablename, final CountDownLatch latch) { + this.connector = connector; + this.workerIndex = workerIndex; + this.tablename = tablename; + this.latch = latch; + } + + /** + * execute the scan across the sample data and put scan result into result map until testInProgress flag is set to false. + */ + @Override + public void run() { + + latch.countDown(); + try { + latch.await(); + } catch (InterruptedException e) { + log.error("Thread interrupted with id {}", workerIndex); + Thread.currentThread().interrupt(); + return; + } + + log.debug("Creating scanner in worker thread {}", workerIndex); + + try { + + scanner = connector.createScanner(tablename, new Authorizations()); + + // Never start readahead + scanner.setReadaheadThreshold(Long.MAX_VALUE); + scanner.setBatchSize(1); + + // create different ranges to try to hit more than one tablet. + scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9"))); + + } catch (TableNotFoundException e) { + throw new IllegalStateException("Initialization failure. Could not create scanner", e); + } + + scanner.fetchColumnFamily(new Text("fam1")); + + for (Map.Entry<Key,Value> entry : scanner) { + + // exit when success condition is met. + if (!testInProgress.get()) { + scanner.clearScanIterators(); + scanner.close(); + + return; + } + + Text row = entry.getKey().getRow(); + + log.debug("worker {}, row {}", workerIndex, row.toString()); + + if (entry.getValue() != null) { + + Value prevValue = resultsByWorker.put(workerIndex, entry.getValue()); + + // value should always being increasing + if (prevValue != null) { + + log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue())); + + assertTrue(prevValue.compareTo(entry.getValue()) > 0); + } + } else { + log.info("Scanner returned null"); + fail("Scanner returned unexpected null value"); + } + + } + + log.debug("Scanner ran out of data. (info only, not an error) "); + + } + } + + /** + * Create splits on table and force migration by taking table offline and then bring back online for test. + * + * @param conn + * Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void addSplits(final Connector conn, final String tableName) { + + SortedSet<Text> splits = createSplits(); + + try { + + conn.tableOperations().addSplits(tableName, splits); + + conn.tableOperations().offline(tableName, true); + + sleepUninterruptibly(2, TimeUnit.SECONDS); + conn.tableOperations().online(tableName, true); + + for (Text split : conn.tableOperations().listSplits(tableName)) { + log.trace("Split {}", split); + } + + } catch (AccumuloSecurityException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } catch (TableNotFoundException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } catch (AccumuloException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } + + } + + /** + * Create splits to distribute data across multiple tservers. + * + * @return splits in sorted set for addSplits. + */ + private SortedSet<Text> createSplits() { + + SortedSet<Text> splits = new TreeSet<Text>(); + + for (int split = 0; split < 10; split++) { + splits.add(new Text(Integer.toString(split))); + } + + return splits; + } + + /** + * Generate some sample data using random row id to distribute across splits. + * <p/> + * The primary goal is to determine that each scanner is assigned a unique scan id. This test does check that the count value for fam1 increases if a scanner + * reads multiple value, but this is secondary consideration for this test, that is included for completeness. + * + * @param connector + * Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void generateSampleData(Connector connector, final String tablename) { + + try { + + BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig()); + + ColumnVisibility vis = new ColumnVisibility("public"); + + for (int i = 0; i < NUM_DATA_ROWS; i++) { + + Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i))); + + Mutation m = new Mutation(rowId); + m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8))); + m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS - i).getBytes(UTF_8))); + m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i - NUM_DATA_ROWS).getBytes(UTF_8))); + + log.trace("Added row {}", rowId); + + bw.addMutation(m); + } + + bw.close(); + } catch (TableNotFoundException ex) { + throw new IllegalStateException("Initialization failed. Could not create test data", ex); + } catch (MutationsRejectedException ex) { + throw new IllegalStateException("Initialization failed. Could not create test data", ex); + } + } + + /** + * Attach the test slow iterator so that we have time to read the scan id without creating a large dataset. Uses a fairly large sleep and delay times because + * we are not concerned with how much data is read and we do not read all of the data - the test stops once each scanner reports a scan id. + * + * @param connector + * Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void attachSlowIterator(Connector connector, final String tablename) { + try { + + IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator"); + slowIter.addOption("sleepTime", "200"); + slowIter.addOption("seekSleepTime", "200"); + + connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan)); + + } catch (AccumuloException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } catch (TableNotFoundException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } catch (AccumuloSecurityException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/2c8088e9/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java ---------------------------------------------------------------------- diff --cc test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java index 11694fd,0000000..25896ba mode 100644,000000..100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java @@@ -1,224 -1,0 +1,223 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + ++import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; ++import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ScannerOpts; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.impl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.util.CheckForMetadataProblems; +import org.apache.accumulo.test.TestIngest; +import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + - import com.google.common.base.Charsets; - import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - +public class SplitIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(SplitIT.class); + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_MAXMEM, "5K"); + cfg.setProperty(Property.TSERV_MAJC_DELAY, "100ms"); + } + + @Override + protected int defaultTimeoutSeconds() { + return 4 * 60; + } + + private String tservMaxMem, tservMajcDelay; + + @Before + public void alterConfig() throws Exception { + Assume.assumeTrue(ClusterType.MINI == getClusterType()); + + InstanceOperations iops = getConnector().instanceOperations(); + Map<String,String> config = iops.getSystemConfiguration(); + tservMaxMem = config.get(Property.TSERV_MAXMEM.getKey()); + tservMajcDelay = config.get(Property.TSERV_MAJC_DELAY.getKey()); + + if (!tservMajcDelay.equals("100ms")) { + iops.setProperty(Property.TSERV_MAJC_DELAY.getKey(), "100ms"); + } + + // Property.TSERV_MAXMEM can't be altered on a running server + boolean restarted = false; + if (!tservMaxMem.equals("5K")) { + iops.setProperty(Property.TSERV_MAXMEM.getKey(), "5K"); + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + restarted = true; + } + + // If we restarted the tservers, we don't need to re-wait for the majc delay + if (!restarted) { + long millis = AccumuloConfiguration.getTimeInMillis(tservMajcDelay); + log.info("Waiting for majc delay period: {}ms", millis); + Thread.sleep(millis); + log.info("Finished waiting for majc delay period"); + } + } + + @After + public void resetConfig() throws Exception { + if (null != tservMaxMem) { + log.info("Resetting {}={}", Property.TSERV_MAXMEM.getKey(), tservMaxMem); + getConnector().instanceOperations().setProperty(Property.TSERV_MAXMEM.getKey(), tservMaxMem); + tservMaxMem = null; + getCluster().getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getCluster().getClusterControl().startAllServers(ServerType.TABLET_SERVER); + } + if (null != tservMajcDelay) { + log.info("Resetting {}={}", Property.TSERV_MAJC_DELAY.getKey(), tservMajcDelay); + getConnector().instanceOperations().setProperty(Property.TSERV_MAJC_DELAY.getKey(), tservMajcDelay); + tservMajcDelay = null; + } + } + + @Test + public void tabletShouldSplit() throws Exception { + Connector c = getConnector(); + String table = getUniqueNames(1)[0]; + c.tableOperations().create(table); + c.tableOperations().setProperty(table, Property.TABLE_SPLIT_THRESHOLD.getKey(), "256K"); + c.tableOperations().setProperty(table, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "1K"); + TestIngest.Opts opts = new TestIngest.Opts(); + VerifyIngest.Opts vopts = new VerifyIngest.Opts(); + opts.rows = 100000; + opts.setTableName(table); + + ClientConfiguration clientConfig = cluster.getClientConfig(); + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + opts.updateKerberosCredentials(clientConfig); + vopts.updateKerberosCredentials(clientConfig); + } else { + opts.setPrincipal(getAdminPrincipal()); + vopts.setPrincipal(getAdminPrincipal()); + } + + TestIngest.ingest(c, opts, new BatchWriterOpts()); + vopts.rows = opts.rows; + vopts.setTableName(table); + VerifyIngest.verifyIngest(c, vopts, new ScannerOpts()); + while (c.tableOperations().listSplits(table).size() < 10) { + sleepUninterruptibly(15, TimeUnit.SECONDS); + } + String id = c.tableOperations().tableIdMap().get(table); + Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + KeyExtent extent = new KeyExtent(new Text(id), null, null); + s.setRange(extent.toMetadataRange()); + MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.fetch(s); + int count = 0; + int shortened = 0; + for (Entry<Key,Value> entry : s) { + extent = new KeyExtent(entry.getKey().getRow(), entry.getValue()); + if (extent.getEndRow() != null && extent.getEndRow().toString().length() < 14) + shortened++; + count++; + } + + assertTrue("Shortened should be greater than zero: " + shortened, shortened > 0); + assertTrue("Count should be cgreater than 10: " + count, count > 10); + + String[] args; + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + ClusterUser rootUser = getAdminUser(); + args = new String[] {"-i", cluster.getInstanceName(), "-u", rootUser.getPrincipal(), "--keytab", rootUser.getKeytab().getAbsolutePath(), "-z", + cluster.getZooKeepers()}; + } else { + PasswordToken token = (PasswordToken) getAdminToken(); - args = new String[] {"-i", cluster.getInstanceName(), "-u", "root", "-p", new String(token.getPassword(), Charsets.UTF_8), "-z", cluster.getZooKeepers()}; ++ args = new String[] {"-i", cluster.getInstanceName(), "-u", "root", "-p", new String(token.getPassword(), UTF_8), "-z", cluster.getZooKeepers()}; + } + + assertEquals(0, getCluster().getClusterControl().exec(CheckForMetadataProblems.class, args)); + } + + @Test + public void interleaveSplit() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); + c.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"); + sleepUninterruptibly(5, TimeUnit.SECONDS); + ReadWriteIT.interleaveTest(c, tableName); + sleepUninterruptibly(5, TimeUnit.SECONDS); + int numSplits = c.tableOperations().listSplits(tableName).size(); + while (numSplits <= 20) { + log.info("Waiting for splits to happen"); + Thread.sleep(2000); + numSplits = c.tableOperations().listSplits(tableName).size(); + } + assertTrue("Expected at least 20 splits, saw " + numSplits, numSplits > 20); + } + + @Test + public void deleteSplit() throws Exception { + Connector c = getConnector(); + String tableName = getUniqueNames(1)[0]; + c.tableOperations().create(tableName); + c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); + ClientConfiguration clientConfig = getCluster().getClientConfig(); + String password = null, keytab = null; + if (clientConfig.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + keytab = getAdminUser().getKeytab().getAbsolutePath(); + } else { - password = new String(((PasswordToken) getAdminToken()).getPassword(), Charsets.UTF_8); ++ password = new String(((PasswordToken) getAdminToken()).getPassword(), UTF_8); + } + DeleteIT.deleteTest(c, getCluster(), getAdminPrincipal(), password, tableName, keytab); + c.tableOperations().flush(tableName, null, null, true); + for (int i = 0; i < 5; i++) { + sleepUninterruptibly(10, TimeUnit.SECONDS); + if (c.tableOperations().listSplits(tableName).size() > 20) + break; + } + assertTrue(c.tableOperations().listSplits(tableName).size() > 20); + } + +}