http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java b/test/src/main/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java new file mode 100644 index 0000000..4d233a5 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/harness/conf/AccumuloMiniClusterConfiguration.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.harness.conf; + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.harness.AccumuloClusterHarness.ClusterType; +import org.apache.accumulo.harness.MiniClusterHarness; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * Extract configuration properties for a MiniAccumuloCluster from Java properties + */ +public class AccumuloMiniClusterConfiguration extends AccumuloClusterPropertyConfiguration { + private static final Logger log = LoggerFactory.getLogger(AccumuloMiniClusterConfiguration.class); + private static final String TRUE = Boolean.toString(true); + + public static final String ACCUMULO_MINI_PRINCIPAL_KEY = ACCUMULO_MINI_PREFIX + "principal"; + public static final String ACCUMULO_MINI_PRINCIPAL_DEFAULT = "root"; + public static final String ACCUMULO_MINI_PASSWORD_KEY = ACCUMULO_MINI_PREFIX + "password"; + public static final String ACCUMULO_MINI_PASSWORD_DEFAULT = "rootPassword1"; + + private final Map<String,String> conf; + private final boolean saslEnabled; + private ClientConfiguration clientConf; + + public AccumuloMiniClusterConfiguration() { + ClusterType type = getClusterType(); + if (ClusterType.MINI != type) { + throw new IllegalStateException("Expected only to see mini cluster state"); + } + + this.conf = getConfiguration(type); + this.saslEnabled = TRUE.equals(System.getProperty(MiniClusterHarness.USE_KERBEROS_FOR_IT_OPTION)); + log.debug("SASL is {}enabled", (saslEnabled ? "" : "not ")); + } + + @Override + public String getAdminPrincipal() { + if (saslEnabled) { + return AccumuloClusterHarness.getKdc().getRootUser().getPrincipal(); + } else { + String principal = conf.get(ACCUMULO_MINI_PRINCIPAL_KEY); + if (null == principal) { + principal = ACCUMULO_MINI_PRINCIPAL_DEFAULT; + } + + return principal; + } + } + + @Override + public AuthenticationToken getAdminToken() { + if (saslEnabled) { + // Turn on Kerberos authentication so UGI acts properly + final Configuration conf = new Configuration(false); + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(conf); + + ClusterUser rootUser = AccumuloClusterHarness.getKdc().getRootUser(); + try { + return new KerberosToken(rootUser.getPrincipal(), rootUser.getKeytab(), true); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + String password = conf.get(ACCUMULO_MINI_PASSWORD_KEY); + if (null == password) { + password = ACCUMULO_MINI_PASSWORD_DEFAULT; + } + + return new PasswordToken(password); + } + } + + @Override + public ClusterType getClusterType() { + return ClusterType.MINI; + } + + @Override + public ClientConfiguration getClientConf() { + return clientConf; + } + + public void setClientConf(ClientConfiguration conf) { + Preconditions.checkNotNull(conf, "Client configuration was null"); + this.clientConf = conf; + } +}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java b/test/src/main/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java new file mode 100644 index 0000000..ba9dcef --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/harness/conf/StandaloneAccumuloClusterConfiguration.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.harness.conf; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.harness.AccumuloClusterHarness.ClusterType; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Extract connection information to a standalone Accumulo instance from Java properties + */ +public class StandaloneAccumuloClusterConfiguration extends AccumuloClusterPropertyConfiguration { + private static final Logger log = LoggerFactory.getLogger(StandaloneAccumuloClusterConfiguration.class); + + public static final String ACCUMULO_STANDALONE_ADMIN_PRINCIPAL_KEY = ACCUMULO_STANDALONE_PREFIX + "admin.principal"; + public static final String ACCUMULO_STANDALONE_ADMIN_PRINCIPAL_DEFAULT = "root"; + public static final String ACCUMULO_STANDALONE_PASSWORD_KEY = ACCUMULO_STANDALONE_PREFIX + "admin.password"; + public static final String ACCUMULO_STANDALONE_PASSWORD_DEFAULT = "rootPassword1"; + public static final String ACCUMULO_STANDALONE_ADMIN_KEYTAB_KEY = ACCUMULO_STANDALONE_PREFIX + "admin.keytab"; + public static final String ACCUMULO_STANDALONE_ZOOKEEPERS_KEY = ACCUMULO_STANDALONE_PREFIX + "zookeepers"; + public static final String ACCUMULO_STANDALONE_ZOOKEEPERS_DEFAULT = "localhost"; + public static final String ACCUMULO_STANDALONE_INSTANCE_NAME_KEY = ACCUMULO_STANDALONE_PREFIX + "instance.name"; + public static final String ACCUMULO_STANDALONE_INSTANCE_NAME_DEFAULT = "accumulo"; + public static final String ACCUMULO_STANDALONE_TMP_DIR_KEY = ACCUMULO_STANDALONE_PREFIX + "tmpdir"; + public static final String ACCUMULO_STANDALONE_TMP_DIR_DEFAULT = "/tmp"; + public static final String ACCUMULO_STANDALONE_SERVER_USER = ACCUMULO_STANDALONE_PREFIX + "server.user"; + public static final String ACCUMULO_STANDALONE_SERVER_USER_DEFAULT = "accumulo"; + + // A set of users we can use to connect to this instances + public static final String ACCUMULO_STANDALONE_USER_KEY = ACCUMULO_STANDALONE_PREFIX + "users."; + // Keytabs for the users + public static final String ACCUMULO_STANDALONE_USER_KEYTABS_KEY = ACCUMULO_STANDALONE_PREFIX + "keytabs."; + // Passwords for the users + public static final String ACCUMULO_STANDALONE_USER_PASSWORDS_KEY = ACCUMULO_STANDALONE_PREFIX + "passwords."; + + public static final String ACCUMULO_STANDALONE_HOME = ACCUMULO_STANDALONE_PREFIX + "home"; + public static final String ACCUMULO_STANDALONE_CLIENT_CONF = ACCUMULO_STANDALONE_PREFIX + "client.conf"; + public static final String ACCUMULO_STANDALONE_SERVER_CONF = ACCUMULO_STANDALONE_PREFIX + "server.conf"; + public static final String ACCUMULO_STANDALONE_HADOOP_CONF = ACCUMULO_STANDALONE_PREFIX + "hadoop.conf"; + + private Map<String,String> conf; + private String serverUser; + private File clientConfFile; + private ClientConfiguration clientConf; + private List<ClusterUser> clusterUsers; + + public StandaloneAccumuloClusterConfiguration(File clientConfFile) { + ClusterType type = getClusterType(); + if (ClusterType.STANDALONE != type) { + throw new IllegalStateException("Expected only to see standalone cluster state"); + } + + this.conf = getConfiguration(type); + this.clientConfFile = clientConfFile; + try { + this.clientConf = new ClientConfiguration(clientConfFile); + } catch (ConfigurationException e) { + throw new RuntimeException("Failed to load client configuration from " + clientConfFile); + } + // Update instance name if not already set + if (!clientConf.containsKey(ClientProperty.INSTANCE_NAME.getKey())) { + clientConf.withInstance(getInstanceName()); + } + // Update zookeeper hosts if not already set + if (!clientConf.containsKey(ClientProperty.INSTANCE_ZK_HOST.getKey())) { + clientConf.withZkHosts(getZooKeepers()); + } + + // The user Accumulo is running as + serverUser = conf.get(ACCUMULO_STANDALONE_SERVER_USER); + if (null == serverUser) { + serverUser = ACCUMULO_STANDALONE_SERVER_USER_DEFAULT; + } + + clusterUsers = new ArrayList<>(); + for (Entry<String,String> entry : conf.entrySet()) { + String key = entry.getKey(); + if (key.startsWith(ACCUMULO_STANDALONE_USER_KEY)) { + String suffix = key.substring(ACCUMULO_STANDALONE_USER_KEY.length()); + String keytab = conf.get(ACCUMULO_STANDALONE_USER_KEYTABS_KEY + suffix); + if (null != keytab) { + File keytabFile = new File(keytab); + assertTrue("Keytab doesn't exist: " + keytabFile, keytabFile.exists() && keytabFile.isFile()); + clusterUsers.add(new ClusterUser(entry.getValue(), keytabFile)); + } else { + String password = conf.get(ACCUMULO_STANDALONE_USER_PASSWORDS_KEY + suffix); + if (null == password) { + throw new IllegalArgumentException("Missing password or keytab configuration for user with offset " + suffix); + } + clusterUsers.add(new ClusterUser(entry.getValue(), password)); + } + } + } + log.info("Initialized Accumulo users with Kerberos keytabs: {}", clusterUsers); + } + + @Override + public String getAdminPrincipal() { + String principal = conf.get(ACCUMULO_STANDALONE_ADMIN_PRINCIPAL_KEY); + if (null == principal) { + principal = ACCUMULO_STANDALONE_ADMIN_PRINCIPAL_DEFAULT; + } + return principal; + } + + public String getPassword() { + String password = conf.get(ACCUMULO_STANDALONE_PASSWORD_KEY); + if (null == password) { + password = ACCUMULO_STANDALONE_PASSWORD_DEFAULT; + } + return password; + } + + public File getAdminKeytab() { + String keytabPath = conf.get(ACCUMULO_STANDALONE_ADMIN_KEYTAB_KEY); + if (null == keytabPath) { + throw new RuntimeException("SASL is enabled, but " + ACCUMULO_STANDALONE_ADMIN_KEYTAB_KEY + " was not provided"); + } + File keytab = new File(keytabPath); + if (!keytab.exists() || !keytab.isFile()) { + throw new RuntimeException(keytabPath + " should be a regular file"); + } + return keytab; + } + + @Override + public AuthenticationToken getAdminToken() { + if (clientConf.getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + File keytab = getAdminKeytab(); + try { + return new KerberosToken(getAdminPrincipal(), keytab, true); + } catch (IOException e) { + // The user isn't logged in + throw new RuntimeException("Failed to create KerberosToken", e); + } + } else { + return new PasswordToken(getPassword()); + } + } + + public String getZooKeepers() { + if (clientConf.containsKey(ClientProperty.INSTANCE_ZK_HOST.getKey())) { + return clientConf.get(ClientProperty.INSTANCE_ZK_HOST); + } + + String zookeepers = conf.get(ACCUMULO_STANDALONE_ZOOKEEPERS_KEY); + if (null == zookeepers) { + zookeepers = ACCUMULO_STANDALONE_ZOOKEEPERS_DEFAULT; + } + return zookeepers; + } + + public String getInstanceName() { + if (clientConf.containsKey(ClientProperty.INSTANCE_NAME.getKey())) { + return clientConf.get(ClientProperty.INSTANCE_NAME); + } + + String instanceName = conf.get(ACCUMULO_STANDALONE_INSTANCE_NAME_KEY); + if (null == instanceName) { + instanceName = ACCUMULO_STANDALONE_INSTANCE_NAME_DEFAULT; + } + return instanceName; + } + + public Instance getInstance() { + // Make sure the ZKI is created with the ClientConf so it gets things like SASL passed through to the connector + return new ZooKeeperInstance(clientConf); + } + + @Override + public ClusterType getClusterType() { + return ClusterType.STANDALONE; + } + + public String getHadoopConfDir() { + return conf.get(ACCUMULO_STANDALONE_HADOOP_CONF); + } + + public String getAccumuloHome() { + return conf.get(ACCUMULO_STANDALONE_HOME); + } + + public String getClientAccumuloConfDir() { + return conf.get(ACCUMULO_STANDALONE_CLIENT_CONF); + } + + public String getServerAccumuloConfDir() { + return conf.get(ACCUMULO_STANDALONE_SERVER_CONF); + } + + @Override + public ClientConfiguration getClientConf() { + return clientConf; + } + + public File getClientConfFile() { + return clientConfFile; + } + + public Path getTmpDirectory() { + String tmpDir = conf.get(ACCUMULO_STANDALONE_TMP_DIR_KEY); + if (null == tmpDir) { + tmpDir = ACCUMULO_STANDALONE_TMP_DIR_DEFAULT; + } + return new Path(tmpDir); + } + + public List<ClusterUser> getUsers() { + return Collections.unmodifiableList(clusterUsers); + } + + /** + * @return The user Accumulo is running as + */ + public String getAccumuloServerUser() { + return serverUser; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java new file mode 100644 index 0000000..a2f522e --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/AccumuloOutputFormatIT.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static com.google.common.base.Charsets.UTF_8; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordWriter; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import com.google.common.collect.Maps; + +/** + * Prevent regression of ACCUMULO-3709. Exists as a mini test because mock instance doesn't produce this error when dynamically changing the table permissions. + */ +public class AccumuloOutputFormatIT { + + private static final String TABLE = "abc"; + private MiniAccumuloCluster accumulo; + private String secret = "secret"; + + @Rule + public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target")); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Before + public void setUp() throws Exception { + folder.create(); + MiniAccumuloConfig config = new MiniAccumuloConfig(folder.getRoot(), secret); + Map<String,String> configMap = Maps.newHashMap(); + configMap.put(Property.TSERV_SESSION_MAXIDLE.toString(), "1"); + config.setSiteConfig(configMap); + config.setNumTservers(1); + accumulo = new MiniAccumuloCluster(config); + accumulo.start(); + } + + @After + public void tearDown() throws Exception { + accumulo.stop(); + folder.delete(); + } + + @Test + public void testMapred() throws Exception { + ClientConfiguration clientConfig = accumulo.getClientConfig(); + ZooKeeperInstance instance = new ZooKeeperInstance(clientConfig); + Connector connector = instance.getConnector("root", new PasswordToken(secret)); + // create a table and put some data in it + connector.tableOperations().create(TABLE); + + JobConf job = new JobConf(); + BatchWriterConfig batchConfig = new BatchWriterConfig(); + // no flushes!!!!! + batchConfig.setMaxLatency(0, TimeUnit.MILLISECONDS); + // use a single thread to ensure our update session times out + batchConfig.setMaxWriteThreads(1); + // set the max memory so that we ensure we don't flush on the write. + batchConfig.setMaxMemory(Long.MAX_VALUE); + AccumuloOutputFormat outputFormat = new AccumuloOutputFormat(); + AccumuloOutputFormat.setBatchWriterOptions(job, batchConfig); + AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig); + AccumuloOutputFormat.setConnectorInfo(job, "root", new PasswordToken(secret)); + RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null); + + try { + for (int i = 0; i < 3; i++) { + Mutation m = new Mutation(new Text(String.format("%08d", i))); + for (int j = 0; j < 3; j++) { + m.put(new Text("cf1"), new Text("cq" + j), new Value((i + "_" + j).getBytes(UTF_8))); + writer.write(new Text(TABLE), m); + } + } + + } catch (Exception e) { + e.printStackTrace(); + // we don't want the exception to come from write + } + + connector.securityOperations().revokeTablePermission("root", TABLE, TablePermission.WRITE); + + exception.expect(IOException.class); + exception.expectMessage("PERMISSION_DENIED"); + writer.close(null); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java b/test/src/main/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java new file mode 100644 index 0000000..213ab59 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ArbitraryTablePropertiesIT.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.Map.Entry; + +import org.apache.accumulo.cluster.ClusterUser; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ArbitraryTablePropertiesIT extends SharedMiniClusterBase { + private static final Logger log = LoggerFactory.getLogger(ArbitraryTablePropertiesIT.class); + + @Override + protected int defaultTimeoutSeconds() { + return 30; + }; + + // Test set, get, and remove arbitrary table properties on the root account + @Test + public void setGetRemoveTablePropertyRoot() throws Exception { + log.debug("Starting setGetRemoveTablePropertyRoot test ------------------------"); + + // make a table + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + + // Set variables for the property name to use and the initial value + String propertyName = "table.custom.description"; + String description1 = "Description"; + + // Make sure the property name is valid + Assert.assertTrue(Property.isValidPropertyKey(propertyName)); + // Set the property to the desired value + conn.tableOperations().setProperty(tableName, propertyName, description1); + + // Loop through properties to make sure the new property is added to the list + int count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName) && property.getValue().equals(description1)) + count++; + } + Assert.assertEquals(count, 1); + + // Set the property as something different + String description2 = "set second"; + conn.tableOperations().setProperty(tableName, propertyName, description2); + + // / Loop through properties to make sure the new property is added to the list + count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName) && property.getValue().equals(description2)) + count++; + } + Assert.assertEquals(count, 1); + + // Remove the property and make sure there is no longer a value associated with it + conn.tableOperations().removeProperty(tableName, propertyName); + + // / Loop through properties to make sure the new property is added to the list + count = 0; + for (Entry<String,String> property : conn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName)) + count++; + } + Assert.assertEquals(count, 0); + } + + // Tests set, get, and remove of user added arbitrary properties using a non-root account with permissions to alter tables + @Test + public void userSetGetRemoveTablePropertyWithPermission() throws Exception { + log.debug("Starting userSetGetRemoveTablePropertyWithPermission test ------------------------"); + + // Make a test username and password + ClusterUser user = getUser(0); + String testUser = user.getPrincipal(); + AuthenticationToken testToken = user.getToken(); + + // Create a root user and create the table + // Create a test user and grant that user permission to alter the table + final String tableName = getUniqueNames(1)[0]; + final Connector c = getConnector(); + c.securityOperations().createLocalUser(testUser, (testToken instanceof PasswordToken ? (PasswordToken) testToken : null)); + c.tableOperations().create(tableName); + c.securityOperations().grantTablePermission(testUser, tableName, TablePermission.ALTER_TABLE); + + // Set variables for the property name to use and the initial value + String propertyName = "table.custom.description"; + String description1 = "Description"; + + // Make sure the property name is valid + Assert.assertTrue(Property.isValidPropertyKey(propertyName)); + + // Getting a fresh token will ensure we're logged in as this user (if necessary) + Connector testConn = c.getInstance().getConnector(testUser, user.getToken()); + // Set the property to the desired value + testConn.tableOperations().setProperty(tableName, propertyName, description1); + + // Loop through properties to make sure the new property is added to the list + int count = 0; + for (Entry<String,String> property : testConn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName) && property.getValue().equals(description1)) + count++; + } + Assert.assertEquals(count, 1); + + // Set the property as something different + String description2 = "set second"; + testConn.tableOperations().setProperty(tableName, propertyName, description2); + + // / Loop through properties to make sure the new property is added to the list + count = 0; + for (Entry<String,String> property : testConn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName) && property.getValue().equals(description2)) + count++; + } + Assert.assertEquals(count, 1); + + // Remove the property and make sure there is no longer a value associated with it + testConn.tableOperations().removeProperty(tableName, propertyName); + + // / Loop through properties to make sure the new property is added to the list + count = 0; + for (Entry<String,String> property : testConn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName)) + count++; + } + Assert.assertEquals(count, 0); + + } + + // Tests set and get of user added arbitrary properties using a non-root account without permissions to alter tables + @Test + public void userSetGetTablePropertyWithoutPermission() throws Exception { + log.debug("Starting userSetGetTablePropertyWithoutPermission test ------------------------"); + + // Make a test username and password + ClusterUser user = getUser(1); + String testUser = user.getPrincipal(); + AuthenticationToken testToken = user.getToken(); + + // Create a root user and create the table + // Create a test user and grant that user permission to alter the table + final String tableName = getUniqueNames(1)[0]; + final Connector c = getConnector(); + c.securityOperations().createLocalUser(testUser, (testToken instanceof PasswordToken ? (PasswordToken) testToken : null)); + c.tableOperations().create(tableName); + + // Set variables for the property name to use and the initial value + String propertyName = "table.custom.description"; + String description1 = "Description"; + + // Make sure the property name is valid + Assert.assertTrue(Property.isValidPropertyKey(propertyName)); + + // Getting a fresh token will ensure we're logged in as this user (if necessary) + Connector testConn = c.getInstance().getConnector(testUser, user.getToken()); + + // Try to set the property to the desired value. + // If able to set it, the test fails, since permission was never granted + try { + testConn.tableOperations().setProperty(tableName, propertyName, description1); + Assert.fail("Was able to set property without permissions"); + } catch (AccumuloSecurityException e) {} + + // Loop through properties to make sure the new property is not added to the list + int count = 0; + for (Entry<String,String> property : testConn.tableOperations().getProperties(tableName)) { + if (property.getKey().equals(propertyName)) + count++; + } + Assert.assertEquals(count, 0); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/AssignmentThreadsIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/AssignmentThreadsIT.java b/test/src/main/java/org/apache/accumulo/test/AssignmentThreadsIT.java new file mode 100644 index 0000000..c9a83a6 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/AssignmentThreadsIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertTrue; + +import java.util.Random; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +// ACCUMULO-1177 +public class AssignmentThreadsIT extends ConfigurableMacBase { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT, "1"); + } + + // [0-9a-f] + private final static byte[] HEXCHARS = {0x30, 0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66}; + private final static Random random = new Random(); + + public static byte[] randomHex(int n) { + byte[] binary = new byte[n]; + byte[] hex = new byte[n * 2]; + random.nextBytes(binary); + int count = 0; + for (byte x : binary) { + hex[count++] = HEXCHARS[(x >> 4) & 0xf]; + hex[count++] = HEXCHARS[x & 0xf]; + } + return hex; + } + + @Test(timeout = 5 * 60 * 1000) + public void testConcurrentAssignmentPerformance() throws Exception { + // make a table with a lot of splits + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + log.info("Creating table"); + c.tableOperations().create(tableName); + SortedSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < 1000; i++) { + splits.add(new Text(randomHex(8))); + } + log.info("Adding splits"); + c.tableOperations().addSplits(tableName, splits); + log.info("Taking table offline"); + c.tableOperations().offline(tableName, true); + // time how long it takes to load + log.info("Bringing the table online"); + long now = System.currentTimeMillis(); + c.tableOperations().online(tableName, true); + long diff = System.currentTimeMillis() - now; + log.info("Loaded " + splits.size() + " tablets in " + diff + " ms"); + c.instanceOperations().setProperty(Property.TSERV_ASSIGNMENT_MAXCONCURRENT.getKey(), "20"); + now = System.currentTimeMillis(); + log.info("Taking table offline, again"); + c.tableOperations().offline(tableName, true); + // wait >10 seconds for thread pool to update + UtilWaitThread.sleep(Math.max(0, now + 11 * 1000 - System.currentTimeMillis())); + now = System.currentTimeMillis(); + log.info("Bringing table back online"); + c.tableOperations().online(tableName, true); + long diff2 = System.currentTimeMillis() - now; + log.debug("Loaded " + splits.size() + " tablets in " + diff2 + " ms"); + assertTrue(diff2 < diff); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java b/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java new file mode 100644 index 0000000..1eb2373 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java @@ -0,0 +1,506 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.SystemPermission; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.security.AuditedSecurityOperation; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.LineIterator; +import org.apache.hadoop.io.Text; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Tests that Accumulo is outputting audit messages as expected. Since this is using MiniAccumuloCluster, it could take a while if we test everything in + * isolation. We test blocks of related operations, run the whole test in one MiniAccumulo instance, trying to clean up objects between each test. The + * MiniAccumuloClusterTest sets up the log4j stuff differently to an installed instance, instead piping everything through stdout and writing to a set location + * so we have to find the logs and grep the bits we need out. + */ +public class AuditMessageIT extends ConfigurableMacBase { + + private static final String AUDIT_USER_1 = "AuditUser1"; + private static final String AUDIT_USER_2 = "AuditUser2"; + private static final String PASSWORD = "password"; + private static final String OLD_TEST_TABLE_NAME = "apples"; + private static final String NEW_TEST_TABLE_NAME = "oranges"; + private static final String THIRD_TEST_TABLE_NAME = "pears"; + private static final Authorizations auths = new Authorizations("private", "public"); + + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Override + public void beforeClusterStart(MiniAccumuloConfigImpl cfg) throws Exception { + File f = new File(cfg.getConfDir(), "auditLog.xml"); + if (f.delete()) { + log.debug("Deleted " + f); + } + } + + // Must be static to survive Junit re-initialising the class every time. + private static String lastAuditTimestamp; + private Connector auditConnector; + private Connector conn; + + private static ArrayList<String> findAuditMessage(ArrayList<String> input, String pattern) { + ArrayList<String> result = new ArrayList<String>(); + for (String s : input) { + if (s.matches(".*" + pattern + ".*")) + result.add(s); + } + return result; + } + + /** + * Returns a List of Audit messages that have been grep'd out of the MiniAccumuloCluster output. + * + * @param stepName + * A unique name for the test being executed, to identify the System.out messages. + * @return A List of the Audit messages, sorted (so in chronological order). + */ + private ArrayList<String> getAuditMessages(String stepName) throws IOException { + // ACCUMULO-3144 Make sure we give the processes enough time to flush the write buffer + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted waiting for data to be flushed to output streams"); + } + + for (MiniAccumuloClusterImpl.LogWriter lw : getCluster().getLogWriters()) { + lw.flush(); + } + + // Grab the audit messages + System.out.println("Start of captured audit messages for step " + stepName); + + ArrayList<String> result = new ArrayList<String>(); + File[] files = getCluster().getConfig().getLogDir().listFiles(); + assertNotNull(files); + for (File file : files) { + // We want to grab the files called .out + if (file.getName().contains(".out") && file.isFile() && file.canRead()) { + LineIterator it = FileUtils.lineIterator(file, UTF_8.name()); + try { + while (it.hasNext()) { + String line = it.nextLine(); + if (line.matches(".* \\[" + AuditedSecurityOperation.AUDITLOG + "\\s*\\].*")) { + // Only include the message if startTimestamp is null. or the message occurred after the startTimestamp value + if ((lastAuditTimestamp == null) || (line.substring(0, 23).compareTo(lastAuditTimestamp) > 0)) + result.add(line); + } + } + } finally { + LineIterator.closeQuietly(it); + } + } + } + Collections.sort(result); + + for (String s : result) { + System.out.println(s); + } + System.out.println("End of captured audit messages for step " + stepName); + if (result.size() > 0) + lastAuditTimestamp = (result.get(result.size() - 1)).substring(0, 23); + + return result; + } + + private void grantEverySystemPriv(Connector conn, String user) throws AccumuloSecurityException, AccumuloException { + SystemPermission[] arrayOfP = new SystemPermission[] {SystemPermission.SYSTEM, SystemPermission.ALTER_TABLE, SystemPermission.ALTER_USER, + SystemPermission.CREATE_TABLE, SystemPermission.CREATE_USER, SystemPermission.DROP_TABLE, SystemPermission.DROP_USER}; + for (SystemPermission p : arrayOfP) { + conn.securityOperations().grantSystemPermission(user, p); + } + } + + @Before + public void resetInstance() throws Exception { + conn = getConnector(); + + removeUsersAndTables(); + + // This will set the lastAuditTimestamp for the first test + getAuditMessages("setup"); + } + + @After + public void removeUsersAndTables() throws Exception { + for (String user : Arrays.asList(AUDIT_USER_1, AUDIT_USER_2)) { + if (conn.securityOperations().listLocalUsers().contains(user)) { + conn.securityOperations().dropLocalUser(user); + } + } + + TableOperations tops = conn.tableOperations(); + for (String table : Arrays.asList(THIRD_TEST_TABLE_NAME, NEW_TEST_TABLE_NAME, OLD_TEST_TABLE_NAME)) { + if (tops.exists(table)) { + tops.delete(table); + } + } + } + + @Test + public void testTableOperationsAudits() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, IOException, + InterruptedException { + + conn.securityOperations().createLocalUser(AUDIT_USER_1, new PasswordToken(PASSWORD)); + conn.securityOperations().grantSystemPermission(AUDIT_USER_1, SystemPermission.SYSTEM); + conn.securityOperations().grantSystemPermission(AUDIT_USER_1, SystemPermission.CREATE_TABLE); + + // Connect as Audit User and do a bunch of stuff. + // Testing activity begins here + auditConnector = getCluster().getConnector(AUDIT_USER_1, new PasswordToken(PASSWORD)); + auditConnector.tableOperations().create(OLD_TEST_TABLE_NAME); + auditConnector.tableOperations().rename(OLD_TEST_TABLE_NAME, NEW_TEST_TABLE_NAME); + Map<String,String> emptyMap = Collections.emptyMap(); + Set<String> emptySet = Collections.emptySet(); + auditConnector.tableOperations().clone(NEW_TEST_TABLE_NAME, OLD_TEST_TABLE_NAME, true, emptyMap, emptySet); + auditConnector.tableOperations().delete(OLD_TEST_TABLE_NAME); + auditConnector.tableOperations().offline(NEW_TEST_TABLE_NAME); + auditConnector.tableOperations().delete(NEW_TEST_TABLE_NAME); + // Testing activity ends here + + ArrayList<String> auditMessages = getAuditMessages("testTableOperationsAudits"); + + assertEquals(1, findAuditMessage(auditMessages, "action: createTable; targetTable: " + OLD_TEST_TABLE_NAME).size()); + assertEquals(1, findAuditMessage(auditMessages, "action: renameTable; targetTable: " + OLD_TEST_TABLE_NAME).size()); + assertEquals(1, findAuditMessage(auditMessages, "action: cloneTable; targetTable: " + NEW_TEST_TABLE_NAME).size()); + assertEquals(1, findAuditMessage(auditMessages, "action: deleteTable; targetTable: " + OLD_TEST_TABLE_NAME).size()); + assertEquals(1, findAuditMessage(auditMessages, "action: offlineTable; targetTable: " + NEW_TEST_TABLE_NAME).size()); + assertEquals(1, findAuditMessage(auditMessages, "action: deleteTable; targetTable: " + NEW_TEST_TABLE_NAME).size()); + + } + + @Test + public void testUserOperationsAudits() throws AccumuloSecurityException, AccumuloException, TableExistsException, InterruptedException, IOException { + + conn.securityOperations().createLocalUser(AUDIT_USER_1, new PasswordToken(PASSWORD)); + conn.securityOperations().grantSystemPermission(AUDIT_USER_1, SystemPermission.SYSTEM); + conn.securityOperations().grantSystemPermission(AUDIT_USER_1, SystemPermission.CREATE_USER); + grantEverySystemPriv(conn, AUDIT_USER_1); + + // Connect as Audit User and do a bunch of stuff. + // Start testing activities here + auditConnector = getCluster().getConnector(AUDIT_USER_1, new PasswordToken(PASSWORD)); + auditConnector.securityOperations().createLocalUser(AUDIT_USER_2, new PasswordToken(PASSWORD)); + + // It seems only root can grant stuff. + conn.securityOperations().grantSystemPermission(AUDIT_USER_2, SystemPermission.ALTER_TABLE); + conn.securityOperations().revokeSystemPermission(AUDIT_USER_2, SystemPermission.ALTER_TABLE); + auditConnector.tableOperations().create(NEW_TEST_TABLE_NAME); + conn.securityOperations().grantTablePermission(AUDIT_USER_2, NEW_TEST_TABLE_NAME, TablePermission.READ); + conn.securityOperations().revokeTablePermission(AUDIT_USER_2, NEW_TEST_TABLE_NAME, TablePermission.READ); + auditConnector.securityOperations().changeLocalUserPassword(AUDIT_USER_2, new PasswordToken("anything")); + auditConnector.securityOperations().changeUserAuthorizations(AUDIT_USER_2, auths); + auditConnector.securityOperations().dropLocalUser(AUDIT_USER_2); + // Stop testing activities here + + ArrayList<String> auditMessages = getAuditMessages("testUserOperationsAudits"); + + assertEquals(1, findAuditMessage(auditMessages, "action: createUser; targetUser: " + AUDIT_USER_2).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + "action: grantSystemPermission; permission: " + SystemPermission.ALTER_TABLE.toString() + "; targetUser: " + AUDIT_USER_2).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + "action: revokeSystemPermission; permission: " + SystemPermission.ALTER_TABLE.toString() + "; targetUser: " + AUDIT_USER_2).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + "action: grantTablePermission; permission: " + TablePermission.READ.toString() + "; targetTable: " + NEW_TEST_TABLE_NAME).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + "action: revokeTablePermission; permission: " + TablePermission.READ.toString() + "; targetTable: " + NEW_TEST_TABLE_NAME).size()); + assertEquals(1, findAuditMessage(auditMessages, "action: changePassword; targetUser: " + AUDIT_USER_2 + "").size()); + assertEquals(1, findAuditMessage(auditMessages, "action: changeAuthorizations; targetUser: " + AUDIT_USER_2 + "; authorizations: " + auths.toString()) + .size()); + assertEquals(1, findAuditMessage(auditMessages, "action: dropUser; targetUser: " + AUDIT_USER_2).size()); + } + + @Test + public void testImportExportOperationsAudits() throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException, + IOException, InterruptedException { + + conn.securityOperations().createLocalUser(AUDIT_USER_1, new PasswordToken(PASSWORD)); + conn.securityOperations().grantSystemPermission(AUDIT_USER_1, SystemPermission.SYSTEM); + conn.securityOperations().changeUserAuthorizations(AUDIT_USER_1, auths); + grantEverySystemPriv(conn, AUDIT_USER_1); + + // Connect as Audit User and do a bunch of stuff. + // Start testing activities here + auditConnector = getCluster().getConnector(AUDIT_USER_1, new PasswordToken(PASSWORD)); + auditConnector.tableOperations().create(OLD_TEST_TABLE_NAME); + + // Insert some play data + BatchWriter bw = auditConnector.createBatchWriter(OLD_TEST_TABLE_NAME, new BatchWriterConfig()); + Mutation m = new Mutation("myRow"); + m.put("cf1", "cq1", "v1"); + m.put("cf1", "cq2", "v3"); + bw.addMutation(m); + bw.close(); + + // Prepare to export the table + File exportDir = new File(getCluster().getConfig().getDir().toString() + "/export"); + + auditConnector.tableOperations().offline(OLD_TEST_TABLE_NAME); + auditConnector.tableOperations().exportTable(OLD_TEST_TABLE_NAME, exportDir.toString()); + + // We've exported the table metadata to the MiniAccumuloCluster root dir. Grab the .rf file path to re-import it + File distCpTxt = new File(exportDir.toString() + "/distcp.txt"); + File importFile = null; + LineIterator it = FileUtils.lineIterator(distCpTxt, UTF_8.name()); + + // Just grab the first rf file, it will do for now. + String filePrefix = "file:"; + try { + while (it.hasNext() && importFile == null) { + String line = it.nextLine(); + if (line.matches(".*\\.rf")) { + importFile = new File(line.replaceFirst(filePrefix, "")); + } + } + } finally { + LineIterator.closeQuietly(it); + } + FileUtils.copyFileToDirectory(importFile, exportDir); + auditConnector.tableOperations().importTable(NEW_TEST_TABLE_NAME, exportDir.toString()); + + // Now do a Directory (bulk) import of the same data. + auditConnector.tableOperations().create(THIRD_TEST_TABLE_NAME); + File failDir = new File(exportDir + "/tmp"); + assertTrue(failDir.mkdirs() || failDir.isDirectory()); + auditConnector.tableOperations().importDirectory(THIRD_TEST_TABLE_NAME, exportDir.toString(), failDir.toString(), false); + auditConnector.tableOperations().online(OLD_TEST_TABLE_NAME); + + // Stop testing activities here + + ArrayList<String> auditMessages = getAuditMessages("testImportExportOperationsAudits"); + + assertEquals(1, findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_CREATE_TABLE_AUDIT_TEMPLATE, OLD_TEST_TABLE_NAME)).size()); + assertEquals(1, + findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_ONLINE_OFFLINE_TABLE_AUDIT_TEMPLATE, "offlineTable", OLD_TEST_TABLE_NAME)) + .size()); + assertEquals(1, + findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_EXPORT_AUDIT_TEMPLATE, OLD_TEST_TABLE_NAME, exportDir.toString())).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + String.format(AuditedSecurityOperation.CAN_IMPORT_AUDIT_TEMPLATE, NEW_TEST_TABLE_NAME, filePrefix + exportDir.toString())).size()); + assertEquals(1, findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_CREATE_TABLE_AUDIT_TEMPLATE, THIRD_TEST_TABLE_NAME)).size()); + assertEquals( + 1, + findAuditMessage( + auditMessages, + String.format(AuditedSecurityOperation.CAN_BULK_IMPORT_AUDIT_TEMPLATE, THIRD_TEST_TABLE_NAME, filePrefix + exportDir.toString(), filePrefix + + failDir.toString())).size()); + assertEquals(1, + findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_ONLINE_OFFLINE_TABLE_AUDIT_TEMPLATE, "onlineTable", OLD_TEST_TABLE_NAME)) + .size()); + + } + + @Test + public void testDataOperationsAudits() throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException, IOException, + InterruptedException { + + conn.securityOperations().createLocalUser(AUDIT_USER_1, new PasswordToken(PASSWORD)); + conn.securityOperations().grantSystemPermission(AUDIT_USER_1, SystemPermission.SYSTEM); + conn.securityOperations().changeUserAuthorizations(AUDIT_USER_1, auths); + grantEverySystemPriv(conn, AUDIT_USER_1); + + // Connect as Audit User and do a bunch of stuff. + // Start testing activities here + auditConnector = getCluster().getConnector(AUDIT_USER_1, new PasswordToken(PASSWORD)); + auditConnector.tableOperations().create(OLD_TEST_TABLE_NAME); + + // Insert some play data + BatchWriter bw = auditConnector.createBatchWriter(OLD_TEST_TABLE_NAME, new BatchWriterConfig()); + Mutation m = new Mutation("myRow"); + m.put("cf1", "cq1", "v1"); + m.put("cf1", "cq2", "v3"); + bw.addMutation(m); + bw.close(); + + // Start testing activities here + // A regular scan + Scanner scanner = auditConnector.createScanner(OLD_TEST_TABLE_NAME, auths); + for (Map.Entry<Key,Value> entry : scanner) { + System.out.println("Scanner row: " + entry.getKey() + " " + entry.getValue()); + } + scanner.close(); + + // A batch scan + BatchScanner bs = auditConnector.createBatchScanner(OLD_TEST_TABLE_NAME, auths, 1); + bs.fetchColumn(new Text("cf1"), new Text("cq1")); + bs.setRanges(Arrays.asList(new Range("myRow", "myRow~"))); + + for (Map.Entry<Key,Value> entry : bs) { + System.out.println("BatchScanner row: " + entry.getKey() + " " + entry.getValue()); + } + bs.close(); + + // Delete some data. + auditConnector.tableOperations().deleteRows(OLD_TEST_TABLE_NAME, new Text("myRow"), new Text("myRow~")); + + // End of testing activities + + ArrayList<String> auditMessages = getAuditMessages("testDataOperationsAudits"); + assertTrue(1 <= findAuditMessage(auditMessages, "action: scan; targetTable: " + OLD_TEST_TABLE_NAME).size()); + assertTrue(1 <= findAuditMessage(auditMessages, "action: scan; targetTable: " + OLD_TEST_TABLE_NAME).size()); + assertEquals(1, + findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_DELETE_RANGE_AUDIT_TEMPLATE, OLD_TEST_TABLE_NAME, "myRow", "myRow~")).size()); + + } + + @Test + public void testDeniedAudits() throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException, IOException, + InterruptedException { + + // Create our user with no privs + conn.securityOperations().createLocalUser(AUDIT_USER_1, new PasswordToken(PASSWORD)); + conn.tableOperations().create(OLD_TEST_TABLE_NAME); + auditConnector = getCluster().getConnector(AUDIT_USER_1, new PasswordToken(PASSWORD)); + + // Start testing activities + // We should get denied or / failed audit messages here. + // We don't want the thrown exceptions to stop our tests, and we are not testing that the Exceptions are thrown. + + try { + auditConnector.tableOperations().create(NEW_TEST_TABLE_NAME); + } catch (AccumuloSecurityException ex) {} + try { + auditConnector.tableOperations().rename(OLD_TEST_TABLE_NAME, NEW_TEST_TABLE_NAME); + } catch (AccumuloSecurityException ex) {} + try { + auditConnector.tableOperations().clone(OLD_TEST_TABLE_NAME, NEW_TEST_TABLE_NAME, true, Collections.<String,String> emptyMap(), + Collections.<String> emptySet()); + } catch (AccumuloSecurityException ex) {} + try { + auditConnector.tableOperations().delete(OLD_TEST_TABLE_NAME); + } catch (AccumuloSecurityException ex) {} + try { + auditConnector.tableOperations().offline(OLD_TEST_TABLE_NAME); + } catch (AccumuloSecurityException ex) {} + try { + Scanner scanner = auditConnector.createScanner(OLD_TEST_TABLE_NAME, auths); + scanner.iterator().next().getKey(); + } catch (RuntimeException ex) {} + try { + auditConnector.tableOperations().deleteRows(OLD_TEST_TABLE_NAME, new Text("myRow"), new Text("myRow~")); + } catch (AccumuloSecurityException ex) {} + + // ... that will do for now. + // End of testing activities + + ArrayList<String> auditMessages = getAuditMessages("testDeniedAudits"); + assertEquals(1, + findAuditMessage(auditMessages, "operation: denied;.*" + String.format(AuditedSecurityOperation.CAN_CREATE_TABLE_AUDIT_TEMPLATE, NEW_TEST_TABLE_NAME)) + .size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + "operation: denied;.*" + String.format(AuditedSecurityOperation.CAN_RENAME_TABLE_AUDIT_TEMPLATE, OLD_TEST_TABLE_NAME, NEW_TEST_TABLE_NAME)).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + "operation: denied;.*" + String.format(AuditedSecurityOperation.CAN_CLONE_TABLE_AUDIT_TEMPLATE, OLD_TEST_TABLE_NAME, NEW_TEST_TABLE_NAME)).size()); + assertEquals(1, + findAuditMessage(auditMessages, "operation: denied;.*" + String.format(AuditedSecurityOperation.CAN_DELETE_TABLE_AUDIT_TEMPLATE, OLD_TEST_TABLE_NAME)) + .size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + "operation: denied;.*" + String.format(AuditedSecurityOperation.CAN_ONLINE_OFFLINE_TABLE_AUDIT_TEMPLATE, "offlineTable", OLD_TEST_TABLE_NAME)) + .size()); + assertEquals(1, findAuditMessage(auditMessages, "operation: denied;.*" + "action: scan; targetTable: " + OLD_TEST_TABLE_NAME).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + "operation: denied;.*" + String.format(AuditedSecurityOperation.CAN_DELETE_RANGE_AUDIT_TEMPLATE, OLD_TEST_TABLE_NAME, "myRow", "myRow~")).size()); + } + + @Test + public void testFailedAudits() throws AccumuloSecurityException, AccumuloException, TableExistsException, TableNotFoundException, IOException, + InterruptedException { + + // Start testing activities + // Test that we get a few "failed" audit messages come through when we tell it to do dumb stuff + // We don't want the thrown exceptions to stop our tests, and we are not testing that the Exceptions are thrown. + try { + conn.securityOperations().dropLocalUser(AUDIT_USER_2); + } catch (AccumuloSecurityException ex) {} + try { + conn.securityOperations().revokeSystemPermission(AUDIT_USER_2, SystemPermission.ALTER_TABLE); + } catch (AccumuloSecurityException ex) {} + try { + conn.securityOperations().createLocalUser("root", new PasswordToken("super secret")); + } catch (AccumuloSecurityException ex) {} + ArrayList<String> auditMessages = getAuditMessages("testFailedAudits"); + // ... that will do for now. + // End of testing activities + + assertEquals(1, findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.DROP_USER_AUDIT_TEMPLATE, AUDIT_USER_2)).size()); + assertEquals( + 1, + findAuditMessage(auditMessages, + String.format(AuditedSecurityOperation.REVOKE_SYSTEM_PERMISSION_AUDIT_TEMPLATE, SystemPermission.ALTER_TABLE, AUDIT_USER_2)).size()); + assertEquals(1, findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CREATE_USER_AUDIT_TEMPLATE, "root", "")).size()); + + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java new file mode 100644 index 0000000..5b0b84d --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.apache.accumulo.fate.zookeeper.ZooLock; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Accumulo3047 +public class BadDeleteMarkersCreatedIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(BadDeleteMarkersCreatedIT.class); + + @Override + public int defaultTimeoutSeconds() { + return 120; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.setProperty(Property.GC_CYCLE_START, "0s"); + } + + private int timeoutFactor = 1; + + @Before + public void getTimeoutFactor() { + try { + timeoutFactor = Integer.parseInt(System.getProperty("timeout.factor")); + } catch (NumberFormatException e) { + log.warn("Could not parse integer from timeout.factor"); + } + + Assert.assertTrue("timeout.factor must be greater than or equal to 1", timeoutFactor >= 1); + } + + private String gcCycleDelay, gcCycleStart; + + @Before + public void alterConfig() throws Exception { + InstanceOperations iops = getConnector().instanceOperations(); + Map<String,String> config = iops.getSystemConfiguration(); + gcCycleDelay = config.get(Property.GC_CYCLE_DELAY.getKey()); + gcCycleStart = config.get(Property.GC_CYCLE_START.getKey()); + iops.setProperty(Property.GC_CYCLE_DELAY.getKey(), "1s"); + iops.setProperty(Property.GC_CYCLE_START.getKey(), "0s"); + log.info("Restarting garbage collector"); + + getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); + + Instance instance = getConnector().getInstance(); + ZooCache zcache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + zcache.clear(); + String path = ZooUtil.getRoot(instance) + Constants.ZGC_LOCK; + byte[] gcLockData; + do { + gcLockData = ZooLock.getLockData(zcache, path, null); + if (null != gcLockData) { + log.info("Waiting for GC ZooKeeper lock to expire"); + Thread.sleep(2000); + } + } while (null != gcLockData); + + log.info("GC lock was lost"); + + getCluster().getClusterControl().startAllServers(ServerType.GARBAGE_COLLECTOR); + log.info("Garbage collector was restarted"); + + gcLockData = null; + do { + gcLockData = ZooLock.getLockData(zcache, path, null); + if (null == gcLockData) { + log.info("Waiting for GC ZooKeeper lock to be acquired"); + Thread.sleep(2000); + } + } while (null == gcLockData); + + log.info("GC lock was acquired"); + } + + @After + public void restoreConfig() throws Exception { + InstanceOperations iops = getConnector().instanceOperations(); + if (null != gcCycleDelay) { + iops.setProperty(Property.GC_CYCLE_DELAY.getKey(), gcCycleDelay); + } + if (null != gcCycleStart) { + iops.setProperty(Property.GC_CYCLE_START.getKey(), gcCycleStart); + } + log.info("Restarting garbage collector"); + getCluster().getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); + getCluster().getClusterControl().startAllServers(ServerType.GARBAGE_COLLECTOR); + log.info("Garbage collector was restarted"); + } + + @Test + public void test() throws Exception { + // make a table + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + log.info("Creating table to be deleted"); + c.tableOperations().create(tableName); + final String tableId = c.tableOperations().tableIdMap().get(tableName); + Assert.assertNotNull("Expected to find a tableId", tableId); + + // add some splits + SortedSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < 10; i++) { + splits.add(new Text("" + i)); + } + c.tableOperations().addSplits(tableName, splits); + // get rid of all the splits + c.tableOperations().deleteRows(tableName, null, null); + // get rid of the table + c.tableOperations().delete(tableName); + log.info("Sleeping to let garbage collector run"); + // let gc run + UtilWaitThread.sleep(timeoutFactor * 15 * 1000); + log.info("Verifying that delete markers were deleted"); + // look for delete markers + Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(MetadataSchema.DeletesSection.getRange()); + for (Entry<Key,Value> entry : scanner) { + String row = entry.getKey().getRow().toString(); + if (!row.contains("/" + tableId + "/")) { + log.info("Ignoring delete entry for a table other than the one we deleted"); + continue; + } + Assert.fail("Delete entry should have been deleted by the garbage collector: " + entry.getKey().getRow().toString()); + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java new file mode 100644 index 0000000..bf9f5f0 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/BalanceFasterIT.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +// ACCUMULO-2952 +public class BalanceFasterIT extends ConfigurableMacBase { + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(3); + } + + @Test(timeout = 90 * 1000) + public void test() throws Exception { + // create a table, add a bunch of splits + String tableName = getUniqueNames(1)[0]; + Connector conn = getConnector(); + conn.tableOperations().create(tableName); + SortedSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < 1000; i++) { + splits.add(new Text("" + i)); + } + conn.tableOperations().addSplits(tableName, splits); + // give a short wait for balancing + UtilWaitThread.sleep(10 * 1000); + // find out where the tabets are + Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + s.setRange(MetadataSchema.TabletsSection.getRange()); + Map<String,Integer> counts = new HashMap<String,Integer>(); + while (true) { + int total = 0; + counts.clear(); + for (Entry<Key,Value> kv : s) { + String host = kv.getValue().toString(); + if (!counts.containsKey(host)) + counts.put(host, 0); + counts.put(host, counts.get(host) + 1); + total++; + } + // are enough tablets online? + if (total > 1000) + break; + } + // should be on all three servers + assertTrue(counts.size() == 3); + // and distributed evenly + Iterator<Integer> i = counts.values().iterator(); + int a = i.next(); + int b = i.next(); + int c = i.next(); + assertTrue(Math.abs(a - b) < 3); + assertTrue(Math.abs(a - c) < 3); + assertTrue(a > 330); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/BalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java new file mode 100644 index 0000000..605ac94 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.io.Text; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BalanceIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(BalanceIT.class); + + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Test + public void testBalance() throws Exception { + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + log.info("Creating table"); + c.tableOperations().create(tableName); + SortedSet<Text> splits = new TreeSet<Text>(); + for (int i = 0; i < 10; i++) { + splits.add(new Text("" + i)); + } + log.info("Adding splits"); + c.tableOperations().addSplits(tableName, splits); + log.info("Waiting for balance"); + c.instanceOperations().waitForBalance(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java new file mode 100644 index 0000000..9acefc4 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +// ACCUMULO-3692 +public class BalanceWithOfflineTableIT extends ConfigurableMacBase { + + @Override + protected int defaultTimeoutSeconds() { + return 30; + } + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {} + + @Test + public void test() throws Exception { + final String tableNames[] = getUniqueNames(2); + final String tableName = tableNames[0]; + // create a table with a bunch of splits + + final Connector c = getConnector(); + log.info("Creating table " + tableName); + c.tableOperations().create(tableName); + ; + final SortedSet<Text> splits = new TreeSet<>(); + for (String split : "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",")) { + splits.add(new Text(split)); + } + log.info("Splitting table " + tableName); + c.tableOperations().addSplits(tableName, splits); + log.info("Balancing"); + c.instanceOperations().waitForBalance(); + log.info("Balanced"); + + // create a new table which will unbalance the cluster + final String table2 = tableNames[1]; + log.info("Creating table " + table2); + c.tableOperations().create(table2); + log.info("Creating splits " + table2); + c.tableOperations().addSplits(table2, splits); + + // offline the table, hopefully while there are some migrations going on + log.info("Offlining " + table2); + c.tableOperations().offline(table2, true); + log.info("Offlined " + table2); + + log.info("Waiting for balance"); + + SimpleThreadPool pool = new SimpleThreadPool(1, "waitForBalance"); + Future<Boolean> wait = pool.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + c.instanceOperations().waitForBalance(); + return true; + } + }); + wait.get(20, TimeUnit.SECONDS); + log.info("Balance succeeded with an offline table"); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java b/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java new file mode 100644 index 0000000..11fc595 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/BatchWriterIT.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.junit.Test; + +public class BatchWriterIT extends AccumuloClusterHarness { + + @Override + public int defaultTimeoutSeconds() { + return 30; + } + + @Test + public void test() throws Exception { + // call the batchwriter with buffer of size zero + String table = getUniqueNames(1)[0]; + Connector c = getConnector(); + c.tableOperations().create(table); + BatchWriterConfig config = new BatchWriterConfig(); + config.setMaxMemory(0); + BatchWriter writer = c.createBatchWriter(table, config); + Mutation m = new Mutation("row"); + m.put("cf", "cq", new Value("value".getBytes())); + writer.addMutation(m); + writer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/01ae5b85/test/src/main/java/org/apache/accumulo/test/BulkImportVolumeIT.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportVolumeIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportVolumeIT.java new file mode 100644 index 0000000..ce60893 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/BulkImportVolumeIT.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; + +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsShell; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// ACCUMULO-118/ACCUMULO-2504 +public class BulkImportVolumeIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(BulkImportVolumeIT.class); + + File volDirBase = null; + Path v1, v2; + + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + File baseDir = cfg.getDir(); + volDirBase = new File(baseDir, "volumes"); + File v1f = new File(volDirBase, "v1"); + File v2f = new File(volDirBase, "v2"); + v1 = new Path("file://" + v1f.getAbsolutePath()); + v2 = new Path("file://" + v2f.getAbsolutePath()); + + // Run MAC on two locations in the local file system + cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString()); + + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Test + public void testBulkImportFailure() throws Exception { + String tableName = getUniqueNames(1)[0]; + TableOperations to = getConnector().tableOperations(); + to.create(tableName); + FileSystem fs = getFileSystem(); + Path rootPath = new Path(cluster.getTemporaryPath(), getClass().getName()); + Path bulk = new Path(rootPath, "bulk"); + log.info("bulk: {}", bulk); + if (fs.exists(bulk)) { + fs.delete(bulk, true); + } + assertTrue(fs.mkdirs(bulk)); + Path err = new Path(rootPath, "err"); + log.info("err: {}", err); + if (fs.exists(err)) { + fs.delete(err, true); + } + assertTrue(fs.mkdirs(err)); + Path bogus = new Path(bulk, "bogus.rf"); + fs.create(bogus).close(); + log.info("bogus: {}", bogus); + assertTrue(fs.exists(bogus)); + FsShell fsShell = new FsShell(fs.getConf()); + assertEquals("Failed to chmod " + rootPath, 0, fsShell.run(new String[] {"-chmod", "-R", "777", rootPath.toString()})); + log.info("Importing {} into {} with failures directory {}", bulk, tableName, err); + to.importDirectory(tableName, bulk.toString(), err.toString(), false); + assertEquals(1, fs.listStatus(err).length); + } + +}