This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new 8d87c67 Drop Replication concurrent RW test and IT (#205) 8d87c67 is described below commit 8d87c676c558ffe8fe38062a95c49d2e501942c3 Author: Mike Miller <mmil...@apache.org> AuthorDate: Mon May 9 14:00:03 2022 -0400 Drop Replication concurrent RW test and IT (#205) * Closes #171 --- .../testing/randomwalk/concurrent/Replication.java | 204 --------------------- .../randomwalk/ReplicationRandomWalkIT.java | 58 ------ 2 files changed, 262 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Replication.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Replication.java deleted file mode 100644 index 444d000..0000000 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Replication.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.testing.randomwalk.concurrent; - -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static org.apache.accumulo.core.conf.Property.MANAGER_REPLICATION_SCAN_INTERVAL; -import static org.apache.accumulo.core.conf.Property.REPLICATION_NAME; -import static org.apache.accumulo.core.conf.Property.REPLICATION_PEERS; -import static org.apache.accumulo.core.conf.Property.REPLICATION_PEER_PASSWORD; -import static org.apache.accumulo.core.conf.Property.REPLICATION_PEER_USER; -import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_ASSIGNMENT_SLEEP; -import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_PROCESSOR_DELAY; -import static org.apache.accumulo.core.conf.Property.REPLICATION_WORK_PROCESSOR_PERIOD; -import static org.apache.accumulo.core.conf.Property.TABLE_REPLICATION; -import static org.apache.accumulo.core.conf.Property.TABLE_REPLICATION_TARGET; - -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Random; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.admin.InstanceOperations; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.conf.ClientProperty; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.testing.TestProps; -import org.apache.accumulo.testing.randomwalk.RandWalkEnv; -import org.apache.accumulo.testing.randomwalk.State; -import org.apache.accumulo.testing.randomwalk.Test; -import org.apache.hadoop.io.Text; - -@Deprecated -public class Replication extends Test { - - final int ROWS = 1000; - final int COLS = 50; - - @Override - public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - final AccumuloClient c = env.getAccumuloClient(); - final String instName = ClientProperty.INSTANCE_NAME.getValue(c.properties()); - final String zookeepers = ClientProperty.INSTANCE_ZOOKEEPERS.getValue(c.properties()); - final InstanceOperations iOps = c.instanceOperations(); - final TableOperations tOps = c.tableOperations(); - - // Replicate to ourselves - iOps.setProperty(REPLICATION_NAME.getKey(), instName); - iOps.setProperty(REPLICATION_PEERS.getKey() + instName, - "org.apache.accumulo.tserver.replication.AccumuloReplicaSystem," + instName + "," - + zookeepers); - iOps.setProperty(REPLICATION_PEER_USER.getKey() + instName, env.getAccumuloUserName()); - iOps.setProperty(REPLICATION_PEER_PASSWORD.getKey() + instName, env.getAccumuloPassword()); - // Tweak some replication parameters to make the replication go faster - iOps.setProperty(MANAGER_REPLICATION_SCAN_INTERVAL.getKey(), "1s"); - iOps.setProperty(REPLICATION_WORK_ASSIGNMENT_SLEEP.getKey(), "1s"); - iOps.setProperty(REPLICATION_WORK_PROCESSOR_DELAY.getKey(), "1s"); - iOps.setProperty(REPLICATION_WORK_PROCESSOR_PERIOD.getKey(), "1s"); - - // Ensure the replication table is online - tOps.online(TestProps.REPLICATION_TABLE_NAME, true); - boolean online = tOps.isOnline(TestProps.REPLICATION_TABLE_NAME); - for (int i = 0; i < 10; i++) { - if (online) - break; - sleepUninterruptibly(2, TimeUnit.SECONDS); - online = tOps.isOnline(TestProps.REPLICATION_TABLE_NAME); - } - assertTrue("Replication table was not online", online); - - // Make a source and destination table - final String sourceTable = ("repl-source-" + UUID.randomUUID()).replace('-', '_'); - final String destTable = ("repl-dest-" + UUID.randomUUID()).replace('-', '_'); - final String[] tables = new String[] {sourceTable, destTable}; - - for (String tableName : tables) { - log.debug("creating " + tableName); - tOps.create(tableName); - } - - // Point the source to the destination - final String destID = tOps.tableIdMap().get(destTable); - tOps.setProperty(sourceTable, TABLE_REPLICATION.getKey(), "true"); - tOps.setProperty(sourceTable, TABLE_REPLICATION_TARGET.getKey() + instName, destID); - - // zookeeper propagation wait - sleepUninterruptibly(5, TimeUnit.SECONDS); - - // Maybe split the tables - Random rand = new Random(System.currentTimeMillis()); - for (String tableName : tables) { - if (rand.nextBoolean()) { - splitTable(tOps, tableName); - } - } - - // write some checkable data - BatchWriter bw = c.createBatchWriter(sourceTable, null); - for (int row = 0; row < ROWS; row++) { - Mutation m = new Mutation(itos(row)); - for (int col = 0; col < COLS; col++) { - m.put("", itos(col), ""); - } - bw.addMutation(m); - } - bw.close(); - - // attempt to force the WAL to roll so replication begins - final Set<String> origRefs = c.replicationOperations().referencedFiles(sourceTable); - // write some data we will ignore - while (true) { - final Set<String> updatedFileRefs = c.replicationOperations().referencedFiles(sourceTable); - updatedFileRefs.retainAll(origRefs); - log.debug("updateFileRefs size " + updatedFileRefs.size()); - if (updatedFileRefs.isEmpty()) { - break; - } - bw = c.createBatchWriter(sourceTable, null); - for (int row = 0; row < ROWS; row++) { - Mutation m = new Mutation(itos(row)); - for (int col = 0; col < COLS; col++) { - m.put("ignored", itos(col), ""); - } - bw.addMutation(m); - } - bw.close(); - } - - // wait a little while for replication to take place - sleepUninterruptibly(30, TimeUnit.SECONDS); - - // check the data - Scanner scanner = c.createScanner(destTable, Authorizations.EMPTY); - scanner.fetchColumnFamily(new Text("")); - int row = 0; - int col = 0; - for (Entry<Key,Value> entry : scanner) { - assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString())); - assertEquals(col, Integer.parseInt(entry.getKey().getColumnQualifier().toString())); - col++; - if (col == COLS) { - row++; - col = 0; - } - } - assertEquals(ROWS, row); - assertEquals(0, col); - - // cleanup - for (String tableName : tables) { - log.debug("Deleting " + tableName); - tOps.delete(tableName); - } - } - - // junit isn't a dependency - private void assertEquals(int expected, int actual) { - if (expected != actual) - throw new RuntimeException( - String.format("%d fails to match expected value %d", actual, expected)); - } - - // junit isn't a dependency - private void assertTrue(String string, boolean test) { - if (!test) - throw new RuntimeException(string); - } - - private static String itos(int i) { - return String.format("%08d", i); - } - - private void splitTable(TableOperations tOps, String tableName) throws Exception { - SortedSet<Text> splits = new TreeSet<>(); - for (int i = 1; i <= 9; i++) { - splits.add(new Text(itos(i * (ROWS / 10)))); - } - log.debug("Adding splits to " + tableName); - tOps.addSplits(tableName, splits); - } -} diff --git a/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java b/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java deleted file mode 100644 index 8b428c1..0000000 --- a/src/test/java/org/apache/accumulo/testing/randomwalk/ReplicationRandomWalkIT.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.testing.randomwalk; - -import static java.util.concurrent.TimeUnit.MINUTES; -import static org.apache.accumulo.core.conf.Property.TSERV_NATIVEMAP_ENABLED; -import static org.apache.accumulo.core.conf.Property.TSERV_WAL_MAX_SIZE; - -import org.apache.accumulo.core.client.Accumulo; -import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.ConfigurableMacBase; -import org.apache.hadoop.conf.Configuration; -import org.easymock.EasyMock; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - -@Disabled("Replication ITs are not stable and not currently maintained") -public class ReplicationRandomWalkIT extends ConfigurableMacBase { - - @Override - protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setProperty(TSERV_WAL_MAX_SIZE, "1M"); - cfg.setProperty(TSERV_NATIVEMAP_ENABLED, "false"); - cfg.setNumTservers(1); - } - - @Deprecated - @Test - @Timeout(value = 5, unit = MINUTES) - public void runReplicationRandomWalkStep() throws Exception { - var r = new org.apache.accumulo.testing.randomwalk.concurrent.Replication(); - - RandWalkEnv env = EasyMock.createMock(RandWalkEnv.class); - EasyMock.expect(env.getAccumuloUserName()).andReturn("root").anyTimes(); - EasyMock.expect(env.getAccumuloPassword()).andReturn(ROOT_PASSWORD).anyTimes(); - AccumuloClient client = Accumulo.newClient().from(this.getClientProperties()).build(); - EasyMock.expect(env.getAccumuloClient()).andReturn(client).anyTimes(); - EasyMock.replay(env); - - r.visit(null, env, null); - } -}