This is an automated email from the ASF dual-hosted git repository. mwalch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new 2a58165 ACCUMULO-4784 - New fluent API for creating Connector (#361) 2a58165 is described below commit 2a58165c0cccb607b10b5162075a912983fb2a67 Author: Mike Walch <mwa...@apache.org> AuthorDate: Tue Feb 13 13:48:43 2018 -0500 ACCUMULO-4784 - New fluent API for creating Connector (#361) * Replaces ClientConfiguration which is now deprecated * New API can be passed a file with properties set. An example file 'accumulo-client.properties' is in the tarball distribution. This file should be used in place of client.conf * Build auto-generates accumulo-client.properties for tarball and client-properties.md for documentation on website * Fluent API can also generate ConnectionInfo that can be passed to MapReduce input & output libraries for Accumulo. --- assemble/conf/client.conf | 20 -- assemble/pom.xml | 15 + assemble/src/main/assemblies/component.xml | 8 + core/pom.xml | 15 + .../accumulo/core/client/BatchWriterConfig.java | 30 ++ .../accumulo/core/client/ClientConfiguration.java | 1 + .../accumulo/core/client/ConnectionInfo.java | 54 ++++ .../org/apache/accumulo/core/client/Connector.java | 323 ++++++++++++++++++++- .../accumulo/core/client/impl/ClientContext.java | 23 +- .../core/client/impl/ConnectionInfoFactory.java | 120 ++++++++ .../core/client/impl/ConnectionInfoImpl.java | 63 ++++ .../accumulo/core/client/impl/ConnectorImpl.java | 257 +++++++++++++++- .../core/client/mapred/AbstractInputFormat.java | 24 +- .../core/client/mapred/AccumuloInputFormat.java | 8 +- .../mapred/AccumuloMultiTableInputFormat.java | 3 +- .../core/client/mapred/AccumuloOutputFormat.java | 29 +- .../core/client/mapred/AccumuloRowInputFormat.java | 6 +- .../core/client/mapreduce/AbstractInputFormat.java | 22 +- .../core/client/mapreduce/AccumuloInputFormat.java | 7 +- .../client/mapreduce/AccumuloOutputFormat.java | 26 +- .../client/mapreduce/AccumuloRowInputFormat.java | 6 +- .../accumulo/core/client/mock/MockConnector.java | 15 + .../security/tokens/CredentialProviderToken.java | 20 +- .../accumulo/core/conf/ClientConfigGenerate.java | 193 ++++++++++++ .../apache/accumulo/core/conf/ClientProperty.java | 133 +++++++++ .../accumulo/core/conf/ConfigurationDocGen.java | 3 +- .../core/client/BatchWriterConfigTest.java | 19 ++ .../tokens/CredentialProviderTokenTest.java | 2 + .../accumulo/harness/AccumuloClusterHarness.java | 6 + .../test/functional/ConfigurableMacBase.java | 5 + .../accumulo/test/functional/ConnectorIT.java | 63 ++++ .../test/mapred/AccumuloInputFormatIT.java | 6 +- .../test/mapred/AccumuloOutputFormatIT.java | 16 +- .../test/mapreduce/AccumuloInputFormatIT.java | 7 +- .../test/mapreduce/AccumuloOutputFormatIT.java | 6 +- 35 files changed, 1465 insertions(+), 89 deletions(-) diff --git a/assemble/conf/client.conf b/assemble/conf/client.conf deleted file mode 100644 index 5256b13..0000000 --- a/assemble/conf/client.conf +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# instance.zookeeper.host=localhost:2181 -# instance.rpc.ssl.enabled=false - -# instance.rcp.sasl.enabled=false -# rpc.sasl.qop=auth diff --git a/assemble/pom.xml b/assemble/pom.xml index 4167de3..9a9d94d 100644 --- a/assemble/pom.xml +++ b/assemble/pom.xml @@ -384,6 +384,21 @@ <executable>${basedir}/src/main/scripts/generate-versions-listing.sh</executable> </configuration> </execution> + <execution> + <id>client-props-file</id> + <goals> + <goal>java</goal> + </goals> + <phase>prepare-package</phase> + <configuration> + <mainClass>org.apache.accumulo.core.conf.ClientConfigGenerate</mainClass> + <classpathScope>test</classpathScope> + <arguments> + <argument>--generate-config</argument> + <argument>${project.build.directory}/accumulo-client.properties</argument> + </arguments> + </configuration> + </execution> </executions> </plugin> <plugin> diff --git a/assemble/src/main/assemblies/component.xml b/assemble/src/main/assemblies/component.xml index 405b7b3..98e3b9c 100644 --- a/assemble/src/main/assemblies/component.xml +++ b/assemble/src/main/assemblies/component.xml @@ -117,6 +117,14 @@ <directoryMode>0755</directoryMode> <fileMode>0644</fileMode> </fileSet> + <fileSet> + <directory>target</directory> + <outputDirectory>conf</outputDirectory> + <fileMode>0644</fileMode> + <includes> + <include>accumulo-client.properties</include> + </includes> + </fileSet> <!-- Lift generated thrift proxy code into its own directory --> <fileSet> <directory>../proxy/target</directory> diff --git a/core/pom.xml b/core/pom.xml index 16fa16f..fe89b6b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -291,6 +291,21 @@ </arguments> </configuration> </execution> + <execution> + <id>client-props-markdown</id> + <goals> + <goal>java</goal> + </goals> + <phase>package</phase> + <configuration> + <mainClass>org.apache.accumulo.core.conf.ClientConfigGenerate</mainClass> + <classpathScope>test</classpathScope> + <arguments> + <argument>--generate-markdown</argument> + <argument>${project.build.directory}/generated-docs/client-properties.md</argument> + </arguments> + </configuration> + </execution> </executions> </plugin> </plugins> diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java index 521e0ce..3da6459 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java +++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java @@ -50,6 +50,7 @@ public class BatchWriterConfig implements Writable { private Integer maxWriteThreads = null; private Durability durability = Durability.DEFAULT; + private boolean isDurabilitySet = false; /** * Sets the maximum memory to batch before writing. The smaller this value, the more frequently the {@link BatchWriter} will write.<br> @@ -190,6 +191,7 @@ public class BatchWriterConfig implements Writable { */ public BatchWriterConfig setDurability(Durability durability) { this.durability = durability; + isDurabilitySet = true; return this; } @@ -309,6 +311,34 @@ public class BatchWriterConfig implements Writable { return false; } + private static <T> T merge(T o1, T o2) { + if (o1 != null) + return o1; + return o2; + } + + /** + * Merge this BatchWriterConfig with another. If config is set in both, preference will be given to this config. + * + * @param other + * Another BatchWriterConfig + * @return Merged BatchWriterConfig + * @since 2.0.0 + */ + public BatchWriterConfig merge(BatchWriterConfig other) { + BatchWriterConfig result = new BatchWriterConfig(); + result.maxMemory = merge(this.maxMemory, other.maxMemory); + result.maxLatency = merge(this.maxLatency, other.maxLatency); + result.timeout = merge(this.timeout, other.timeout); + result.maxWriteThreads = merge(this.maxWriteThreads, other.maxWriteThreads); + if (this.isDurabilitySet) { + result.durability = this.durability; + } else if (other.isDurabilitySet) { + result.durability = other.durability; + } + return result; + } + @Override public int hashCode() { HashCodeBuilder hcb = new HashCodeBuilder(); diff --git a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java index 6f0274c..a269229 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ClientConfiguration.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; * Contains a list of property keys recognized by the Accumulo client and convenience methods for setting them. * * @since 1.6.0 + * @deprecated since 2.0.0, replaced {@link Connector#builder()} */ public class ClientConfiguration { private static final Logger log = LoggerFactory.getLogger(ClientConfiguration.class); diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java b/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java new file mode 100644 index 0000000..9a43073 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/ConnectionInfo.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client; + +import java.util.Properties; + +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; + +/** + * Accumulo client connection information. Can be built using {@link Connector#builder()} + * + * @since 2.0.0 + */ +public interface ConnectionInfo { + + /** + * @return Accumulo instance name + */ + String getInstanceName(); + + /** + * @return Zookeeper connection information for Accumulo instance + */ + String getZookeepers(); + + /** + * @return Accumulo principal/username + */ + String getPrincipal(); + + /** + * @return {@link AuthenticationToken} used for this connection + */ + AuthenticationToken getAuthenticationToken(); + + /** + * @return All Accumulo client properties set for this connection + */ + Properties getProperties(); +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java index e36cc82..9ac7d71 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java @@ -16,11 +16,15 @@ */ package org.apache.accumulo.core.client; +import java.util.Properties; + import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NamespaceOperations; import org.apache.accumulo.core.client.admin.ReplicationOperations; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.impl.ConnectorImpl; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.security.Authorizations; /** @@ -77,6 +81,7 @@ public abstract class Connector { int maxWriteThreads) throws TableNotFoundException; /** + * Factory method to create BatchDeleter * * @param tableName * the name of the table to query and delete from @@ -87,7 +92,8 @@ public abstract class Connector { * @param numQueryThreads * the number of concurrent threads to spawn for querying * @param config - * configuration used to create batch writer + * configuration used to create batch writer. This config takes precedence. Any unset values will be merged with config set when the Connector was + * created. If no config was set during Connector creation, BatchWriterConfig defaults will be used. * @return BatchDeleter object for configuring and deleting * @since 1.5.0 */ @@ -96,6 +102,24 @@ public abstract class Connector { throws TableNotFoundException; /** + * Factory method to create BatchDeleter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig defaults will + * be used. + * + * @param tableName + * the name of the table to query and delete from + * @param authorizations + * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in + * must be a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are + * passed, then an exception will be thrown. + * @param numQueryThreads + * the number of concurrent threads to spawn for querying + * @return BatchDeleter object + * @throws TableNotFoundException + * if table not found + */ + public abstract BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException; + + /** * Factory method to create a BatchWriter connected to Accumulo. * * @param tableName @@ -121,7 +145,8 @@ public abstract class Connector { * @param tableName * the name of the table to insert data into * @param config - * configuration used to create batch writer + * configuration used to create batch writer. This config will take precedence. Any unset values will merged with config set when the Connector was + * created. If no config was set during Connector creation, BatchWriterConfig defaults will be used. * @return BatchWriter object for configuring and writing data to * @since 1.5.0 */ @@ -129,6 +154,19 @@ public abstract class Connector { public abstract BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException; /** + * Factory method to create a BatchWriter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig defaults will + * be used. + * + * @param tableName + * the name of the table to insert data into + * @return BatchWriter object + * @throws TableNotFoundException + * if table not found + * @since 2.0.0 + */ + public abstract BatchWriter createBatchWriter(String tableName) throws TableNotFoundException; + + /** * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables, which is good for * ingesting data into multiple tables from the same source * @@ -150,14 +188,23 @@ public abstract class Connector { * multiple tables can be sent to a server in a single batch. Its an efficient way to ingest data into multiple tables from a single process. * * @param config - * configuration used to create multi-table batch writer + * configuration used to create multi-table batch writer. This config will take precedence. Any unset values will merged with config set when the + * Connector was created. If no config was set during Connector creation, BatchWriterConfig defaults will be used. * @return MultiTableBatchWriter object for configuring and writing data to * @since 1.5.0 */ - public abstract MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config); /** + * Factory method to create a Multi-Table BatchWriter. This method uses BatchWriterConfig set when Connector was created. If none was set, BatchWriterConfig + * defaults will be used. + * + * @return MultiTableBatchWriter object + * @since 2.0.0 + */ + public abstract MultiTableBatchWriter createMultiTableBatchWriter(); + + /** * Factory method to create a Scanner connected to Accumulo. * * @param tableName @@ -237,4 +284,272 @@ public abstract class Connector { * @since 1.7.0 */ public abstract ReplicationOperations replicationOperations(); + + /** + * Builds ConnectionInfo after all options have been specified + * + * @since 2.0.0 + */ + public interface ConnInfoFactory { + + /** + * Builds ConnectionInfo after all options have been specified + * + * @return ConnectionInfo + */ + ConnectionInfo info(); + } + + /** + * Builds Connector + * + * @since 2.0.0 + */ + public interface ConnectorFactory extends ConnInfoFactory { + + /** + * Builds Connector after all options have been specified + * + * @return Connector + */ + Connector build() throws AccumuloException, AccumuloSecurityException; + + } + + /** + * Builder method for setting Accumulo instance and zookeepers + * + * @since 2.0.0 + */ + public interface InstanceArgs { + AuthenticationArgs forInstance(String instanceName, String zookeepers); + } + + /** + * Builder methods for creating Connector using properties + * + * @since 2.0.0 + */ + public interface PropertyOptions extends InstanceArgs { + + /** + * Build using properties file. An example properties file can be found at conf/accumulo-client.properties in the Accumulo tarball distribution. + * + * @param propertiesFile + * Path to properties file + * @return this builder + */ + ConnectorFactory usingProperties(String propertiesFile); + + /** + * Build using Java properties object. A list of available properties can be found in the documentation on the project website (http://accumulo.apache.org) + * under 'Development' -> 'Client Properties' + * + * @param properties + * Properties object + * @return this builder + */ + ConnectorFactory usingProperties(Properties properties); + } + + public interface ConnectionInfoOptions extends PropertyOptions { + + /** + * Build using connection information + * + * @param connectionInfo + * ConnectionInfo object + * @return this builder + */ + ConnectorFactory usingConnectionInfo(ConnectionInfo connectionInfo); + } + + /** + * Build methods for authentication + * + * @since 2.0.0 + */ + public interface AuthenticationArgs { + + /** + * Build using password-based credentials + * + * @param username + * User name + * @param password + * Password + * @return this builder + */ + ConnectionOptions usingPassword(String username, CharSequence password); + + /** + * Build using Kerberos credentials + * + * @param principal + * Principal + * @param keyTabFile + * Path to keytab file + * @return this builder + */ + ConnectionOptions usingKerberos(String principal, String keyTabFile); + + /** + * Build using credentials from a CredentialProvider + * + * @param username + * Accumulo user name + * @param name + * Alias to extract Accumulo user password from CredentialProvider + * @param providerUrls + * Comma seperated list of URLs defining CredentialProvider(s) + * @return this builder + */ + ConnectionOptions usingProvider(String username, String name, String providerUrls); + + /** + * Build using specified credentials + * + * @param principal + * Principal/username + * @param token + * Authentication token + * @return this builder + */ + ConnectionOptions usingToken(String principal, AuthenticationToken token); + } + + /** + * Build methods for SSL/TLS + * + * @since 2.0.0 + */ + public interface SslOptions extends ConnectorFactory { + + /** + * Build with SSL trust store + * + * @param path + * Path to trust store + * @return this builder + */ + SslOptions withTruststore(String path); + + /** + * Build with SSL trust store + * + * @param path + * Path to trust store + * @param password + * Password used to encrypt trust store + * @param type + * Trust store type + * @return this builder + */ + SslOptions withTruststore(String path, String password, String type); + + /** + * Build with SSL key store + * + * @param path + * Path to SSL key store + * @return this builder + */ + SslOptions withKeystore(String path); + + /** + * Build with SSL key store + * + * @param path + * Path to keystore + * @param password + * Password used to encyrpt key store + * @param type + * Key store type + * @return this builder + */ + SslOptions withKeystore(String path, String password, String type); + + /** + * Use JSSE system properties to configure SSL + * + * @return this builder + */ + SslOptions useJsse(); + } + + /** + * Build methods for SASL + * + * @since 2.0.0 + */ + public interface SaslOptions extends ConnectorFactory { + + /** + * Build with Kerberos Server Primary + * + * @param kerberosServerPrimary + * Kerberos server primary + * @return this builder + */ + SaslOptions withPrimary(String kerberosServerPrimary); + + /** + * Build with SASL quality of protection + * + * @param qualityOfProtection + * Quality of protection + * @return this builder + */ + SaslOptions withQop(String qualityOfProtection); + } + + /** + * Build methods for connection options + * + * @since 2.0.0 + */ + public interface ConnectionOptions extends ConnectorFactory { + + /** + * Build using Zookeeper timeout + * + * @param timeout + * Zookeeper timeout + * @return this builder + */ + ConnectionOptions withZkTimeout(int timeout); + + /** + * Build with SSL/TLS options + * + * @return this builder + */ + SslOptions withSsl(); + + /** + * Build with SASL options + * + * @return this builder + */ + SaslOptions withSasl(); + + /** + * Build with BatchWriterConfig defaults for BatchWriter, MultiTableBatchWriter & BatchDeleter + * + * @param batchWriterConfig + * BatchWriterConfig + * @return this builder + */ + ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig); + } + + /** + * Creates builder for Connector + * + * @return this builder + * @since 2.0.0 + */ + public static PropertyOptions builder() { + return new ConnectorImpl.ConnectorBuilderImpl(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java index b510c45..356fa02 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java @@ -26,8 +26,8 @@ import java.util.function.Predicate; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; @@ -54,15 +54,18 @@ public class ClientContext { protected final Instance inst; private Credentials creds; private ClientConfiguration clientConf; + private BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); private final AccumuloConfiguration rpcConf; protected Connector conn; - /** - * Instantiate a client context - */ public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf) { + this(instance, credentials, clientConf, new BatchWriterConfig()); + } + + public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf, BatchWriterConfig batchWriterConfig) { this(instance, credentials, convertClientConfig(requireNonNull(clientConf, "clientConf is null"))); this.clientConf = clientConf; + this.batchWriterConfig = batchWriterConfig; } /** @@ -153,6 +156,10 @@ public class ClientContext { return conn; } + public BatchWriterConfig getBatchWriterConfig() { + return batchWriterConfig; + } + /** * Serialize the credentials just before initiating the RPC call */ @@ -200,9 +207,9 @@ public class ClientContext { else { // Reconstitute the server kerberos property from the client config if (Property.GENERAL_KERBEROS_PRINCIPAL == property) { - if (config.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) { + if (config.containsKey(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) { // Avoid providing a realm since we don't know what it is... - return config.getString(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()) + "/_HOST@" + SaslConnectionParams.getDefaultRealm(); + return config.getString(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()) + "/_HOST@" + SaslConnectionParams.getDefaultRealm(); } } return defaults.get(property); @@ -222,8 +229,8 @@ public class ClientContext { // Two client props that don't exist on the server config. Client doesn't need to know about the Kerberos instance from the principle, but servers do // Automatically reconstruct the server property when converting a client config. - if (props.containsKey(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) { - final String serverPrimary = props.remove(ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()); + if (props.containsKey(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey())) { + final String serverPrimary = props.remove(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY.getKey()); if (filter.test(Property.GENERAL_KERBEROS_PRINCIPAL.getKey())) { // Use the _HOST expansion. It should be unnecessary in "client land". props.put(Property.GENERAL_KERBEROS_PRINCIPAL.getKey(), serverPrimary + "/_HOST@" + SaslConnectionParams.getDefaultRealm()); diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java new file mode 100644 index 0000000..1aafc06 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoFactory.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Durability; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.conf.ClientProperty; + +/** + * Creates internal objects using {@link ConnectionInfo} + */ +public class ConnectionInfoFactory { + + public static String getString(ConnectionInfo info, ClientProperty property) { + return property.getValue(info.getProperties()); + } + + public static Long getLong(ConnectionInfo info, ClientProperty property) { + return property.getLong(info.getProperties()); + } + + public static Connector getConnector(ConnectionInfo info) throws AccumuloSecurityException, AccumuloException { + return new ConnectorImpl(getClientContext(info)); + } + + public static ClientContext getClientContext(ConnectionInfo info) { + return new ClientContext(getInstance(info), getCredentials(info), getClientConfiguration(info), getBatchWriterConfig(info)); + } + + public static Instance getInstance(ConnectionInfo info) { + return new ZooKeeperInstance(getClientConfiguration(info)); + } + + public static Credentials getCredentials(ConnectionInfo info) { + return new Credentials(info.getPrincipal(), info.getAuthenticationToken()); + } + + public static BatchWriterConfig getBatchWriterConfig(ConnectionInfo info) { + BatchWriterConfig batchWriterConfig = new BatchWriterConfig(); + Long maxMemory = getLong(info, ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES); + if (maxMemory != null) { + batchWriterConfig.setMaxMemory(maxMemory); + } + Long maxLatency = getLong(info, ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC); + if (maxLatency != null) { + batchWriterConfig.setMaxLatency(maxLatency, TimeUnit.SECONDS); + } + Long timeout = getLong(info, ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC); + if (timeout != null) { + batchWriterConfig.setTimeout(timeout, TimeUnit.SECONDS); + } + String durability = getString(info, ClientProperty.BATCH_WRITER_DURABILITY); + if (!durability.isEmpty()) { + batchWriterConfig.setDurability(Durability.valueOf(durability.toUpperCase())); + } + return batchWriterConfig; + } + + public static ClientConfiguration getClientConfiguration(ConnectionInfo info) { + ClientConfiguration config = ClientConfiguration.create(); + for (Object keyObj : info.getProperties().keySet()) { + String key = (String) keyObj; + String val = info.getProperties().getProperty(key); + if (key.equals(ClientProperty.INSTANCE_ZOOKEEPERS.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_ZK_HOST, val); + } else if (key.equals(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT_SEC.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_ZK_TIMEOUT, val); + } else if (key.equals(ClientProperty.SSL_ENABLED.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_ENABLED, val); + } else if (key.equals(ClientProperty.SSL_KEYSTORE_PATH.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_PATH, val); + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_CLIENT_AUTH, "true"); + } else if (key.equals(ClientProperty.SSL_KEYSTORE_TYPE.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_TYPE, val); + } else if (key.equals(ClientProperty.SSL_KEYSTORE_PASSWORD.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_KEYSTORE_PASSWORD, val); + } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_PATH.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val); + } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_TYPE.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val); + } else if (key.equals(ClientProperty.SSL_TRUSTSTORE_PASSWORD.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SSL_TRUSTSTORE_PATH, val); + } else if (key.equals(ClientProperty.SSL_USE_JSSE.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_USE_JSSE, val); + } else if (key.equals(ClientProperty.SASL_ENABLED.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.INSTANCE_RPC_SSL_ENABLED, val); + } else if (key.equals(ClientProperty.SASL_QOP.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.RPC_SASL_QOP, val); + } else if (key.equals(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY.getKey())) { + config.setProperty(ClientConfiguration.ClientProperty.KERBEROS_SERVER_PRIMARY, val); + } else { + config.setProperty(key, val); + } + } + return config; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java new file mode 100644 index 0000000..916625c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectionInfoImpl.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import java.util.Properties; + +import org.apache.accumulo.core.client.ConnectionInfo; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.conf.ClientProperty; + +public class ConnectionInfoImpl implements ConnectionInfo { + + private Properties properties; + private AuthenticationToken token; + + ConnectionInfoImpl(Properties properties, AuthenticationToken token) { + this.properties = properties; + this.token = token; + } + + @Override + public String getInstanceName() { + return getString(ClientProperty.INSTANCE_NAME); + } + + @Override + public String getZookeepers() { + return getString(ClientProperty.INSTANCE_ZOOKEEPERS); + } + + @Override + public String getPrincipal() { + return getString(ClientProperty.AUTH_USERNAME); + } + + @Override + public Properties getProperties() { + return properties; + } + + @Override + public AuthenticationToken getAuthenticationToken() { + return token; + } + + private String getString(ClientProperty property) { + return property.getValue(properties); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java index f49e4dc..03c719c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java @@ -18,6 +18,12 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; @@ -28,6 +34,7 @@ import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -41,6 +48,11 @@ import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.thrift.ClientService; import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.security.tokens.CredentialProviderToken; +import org.apache.accumulo.core.client.security.tokens.KerberosToken; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.trace.Tracer; @@ -54,7 +66,7 @@ public class ConnectorImpl extends Connector { private InstanceOperations instanceops = null; private ReplicationOperations replicationops = null; - public ConnectorImpl(final ClientContext context) throws AccumuloException, AccumuloSecurityException { + public ConnectorImpl(final ClientContext context) throws AccumuloSecurityException, AccumuloException { checkArgument(context != null, "Context is null"); checkArgument(context.getCredentials() != null, "Credentials are null"); checkArgument(context.getCredentials().getToken() != null, "Authentication token is null"); @@ -113,7 +125,12 @@ public class ConnectorImpl extends Connector { throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, numQueryThreads, config); + return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, numQueryThreads, config.merge(context.getBatchWriterConfig())); + } + + @Override + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { + return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig()); } @Deprecated @@ -127,7 +144,16 @@ public class ConnectorImpl extends Connector { @Override public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException { checkArgument(tableName != null, "tableName is null"); - return new BatchWriterImpl(context, getTableId(tableName), config); + // we used to allow null inputs for bw config + if (config == null) { + config = new BatchWriterConfig(); + } + return new BatchWriterImpl(context, getTableId(tableName), config.merge(context.getBatchWriterConfig())); + } + + @Override + public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException { + return createBatchWriter(tableName, new BatchWriterConfig()); } @Deprecated @@ -139,7 +165,12 @@ public class ConnectorImpl extends Connector { @Override public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) { - return new MultiTableBatchWriterImpl(context, config); + return new MultiTableBatchWriterImpl(context, config.merge(context.getBatchWriterConfig())); + } + + @Override + public MultiTableBatchWriter createMultiTableBatchWriter() { + return createMultiTableBatchWriter(new BatchWriterConfig()); } @Override @@ -193,4 +224,222 @@ public class ConnectorImpl extends Connector { return replicationops; } + + public static class ConnectorBuilderImpl implements InstanceArgs, PropertyOptions, ConnectionInfoOptions, AuthenticationArgs, ConnectionOptions, SslOptions, + SaslOptions, ConnectorFactory { + + private Properties properties = new Properties(); + private AuthenticationToken token = null; + + @Override + public Connector build() throws AccumuloException, AccumuloSecurityException { + return ConnectionInfoFactory.getConnector(new ConnectionInfoImpl(properties, token)); + } + + @Override + public ConnectionInfo info() { + return new ConnectionInfoImpl(properties, token); + } + + @Override + public AuthenticationArgs forInstance(String instanceName, String zookeepers) { + setProperty(ClientProperty.INSTANCE_NAME, instanceName); + setProperty(ClientProperty.INSTANCE_ZOOKEEPERS, zookeepers); + return this; + } + + @Override + public SslOptions withTruststore(String path) { + setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path); + return this; + } + + @Override + public SslOptions withTruststore(String path, String password, String type) { + setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path); + setProperty(ClientProperty.SSL_TRUSTSTORE_PASSWORD, password); + setProperty(ClientProperty.SSL_TRUSTSTORE_TYPE, type); + return this; + } + + @Override + public SslOptions withKeystore(String path) { + setProperty(ClientProperty.SSL_KEYSTORE_PATH, path); + return this; + } + + @Override + public SslOptions withKeystore(String path, String password, String type) { + setProperty(ClientProperty.SSL_KEYSTORE_PATH, path); + setProperty(ClientProperty.SSL_KEYSTORE_PASSWORD, password); + setProperty(ClientProperty.SSL_KEYSTORE_TYPE, type); + return this; + } + + @Override + public SslOptions useJsse() { + setProperty(ClientProperty.SSL_USE_JSSE, "true"); + return this; + } + + @Override + public ConnectionOptions withZkTimeout(int timeout) { + setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT_SEC, Integer.toString(timeout)); + return this; + } + + @Override + public SslOptions withSsl() { + setProperty(ClientProperty.SSL_ENABLED, "true"); + return this; + } + + @Override + public SaslOptions withSasl() { + setProperty(ClientProperty.SASL_ENABLED, "true"); + return this; + } + + @Override + public ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig) { + setProperty(ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES, batchWriterConfig.getMaxMemory()); + setProperty(ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC, batchWriterConfig.getMaxLatency(TimeUnit.SECONDS)); + setProperty(ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC, batchWriterConfig.getTimeout(TimeUnit.SECONDS)); + setProperty(ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS, batchWriterConfig.getMaxWriteThreads()); + setProperty(ClientProperty.BATCH_WRITER_DURABILITY, batchWriterConfig.getDurability().toString()); + return this; + } + + @Override + public SaslOptions withPrimary(String kerberosServerPrimary) { + setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary); + return this; + } + + @Override + public SaslOptions withQop(String qualityOfProtection) { + setProperty(ClientProperty.SASL_QOP, qualityOfProtection); + return this; + } + + @Override + public ConnectorFactory usingProperties(String configFile) { + Properties properties = new Properties(); + try (InputStream is = new FileInputStream(configFile)) { + properties.load(is); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + return usingProperties(properties); + } + + @Override + public ConnectorFactory usingProperties(Properties properties) { + this.properties = properties; + String authMethod = ClientProperty.AUTH_METHOD.getValue(properties).toLowerCase(); + switch (authMethod) { + case "password": + String password = ClientProperty.AUTH_PASSWORD.getValue(properties); + Objects.nonNull(password); + this.token = new PasswordToken(password); + this.properties.remove(ClientProperty.AUTH_PASSWORD); + break; + case "kerberos": + String principal = ClientProperty.AUTH_USERNAME.getValue(properties); + String keytabPath = ClientProperty.AUTH_KERBEROS_KEYTAB_PATH.getValue(properties); + Objects.nonNull(principal); + Objects.nonNull(keytabPath); + try { + this.token = new KerberosToken(principal, new File(keytabPath)); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + case "provider": + String name = ClientProperty.AUTH_PROVIDER_NAME.getValue(properties); + String providerUrls = ClientProperty.AUTH_PROVIDER_URLS.getValue(properties); + try { + this.token = new CredentialProviderToken(name, providerUrls); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + default: + throw new IllegalArgumentException("An authentication method (password, kerberos, etc) must be set"); + } + return this; + } + + @Override + public ConnectionOptions usingPassword(String username, CharSequence password) { + setProperty(ClientProperty.AUTH_METHOD, "password"); + setProperty(ClientProperty.AUTH_USERNAME, username); + this.token = new PasswordToken(password); + return this; + } + + @Override + public ConnectionOptions usingKerberos(String principal, String keyTabFile) { + setProperty(ClientProperty.AUTH_METHOD, "kerberos"); + setProperty(ClientProperty.AUTH_USERNAME, principal); + setProperty(ClientProperty.AUTH_KERBEROS_KEYTAB_PATH, keyTabFile); + try { + this.token = new KerberosToken(principal, new File(keyTabFile)); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + return this; + } + + @Override + public ConnectionOptions usingProvider(String username, String name, String providerUrls) { + setProperty(ClientProperty.AUTH_METHOD, "provider"); + setProperty(ClientProperty.AUTH_USERNAME, username); + setProperty(ClientProperty.AUTH_PROVIDER_NAME, name); + setProperty(ClientProperty.AUTH_PROVIDER_URLS, providerUrls); + try { + this.token = new CredentialProviderToken(name, providerUrls); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + return this; + } + + @Override + public ConnectionOptions usingToken(String principal, AuthenticationToken token) { + this.token = token; + setProperty(ClientProperty.AUTH_USERNAME, principal); + if (token instanceof CredentialProviderToken) { + setProperty(ClientProperty.AUTH_METHOD, "provider"); + CredentialProviderToken cpt = (CredentialProviderToken) token; + setProperty(ClientProperty.AUTH_PROVIDER_NAME, cpt.getName()); + setProperty(ClientProperty.AUTH_PROVIDER_URLS, cpt.getCredentialProviders()); + } else if (token instanceof PasswordToken) { + setProperty(ClientProperty.AUTH_METHOD, "password"); + } else if (token instanceof KerberosToken) { + setProperty(ClientProperty.AUTH_METHOD, "kerberos"); + setProperty(ClientProperty.AUTH_KERBEROS_KEYTAB_PATH, ((KerberosToken) token).getKeytab().getAbsolutePath()); + } else { + setProperty(ClientProperty.AUTH_METHOD, "unknown"); + } + return this; + } + + @Override + public ConnectorFactory usingConnectionInfo(ConnectionInfo connectionInfo) { + this.properties = connectionInfo.getProperties(); + this.token = connectionInfo.getAuthenticationToken(); + return this; + } + + public void setProperty(ClientProperty property, String value) { + properties.setProperty(property.getKey(), value); + } + + public void setProperty(ClientProperty property, Long value) { + setProperty(property, Long.toString(value)); + } + + public void setProperty(ClientProperty property, Integer value) { + setProperty(property, Integer.toString(value)); + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java index 28a3355..5583f9f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AbstractInputFormat.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; @@ -48,6 +49,7 @@ import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.Credentials; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.impl.OfflineScanner; @@ -116,6 +118,20 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { } /** + * Sets connection information needed to communicate with Accumulo for this job + * + * @param job + * Hadoop job instance to be configured + * @param info + * Connection information for Accumulo + * @since 2.0.0 + */ + public static void setConnectionInfo(JobConf job, ConnectionInfo info) throws AccumuloSecurityException { + setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); + setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + } + + /** * Sets the connector information needed to communicate with Accumulo in this job. * * <p> @@ -131,7 +147,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { * @param token * the user's password * @since 1.5.0 + * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead */ + @Deprecated public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException { if (token instanceof KerberosToken) { log.info("Received KerberosToken, attempting to fetch DelegationToken"); @@ -172,7 +190,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { * @param tokenFile * the path to the token file * @since 1.6.0 + * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead */ + @Deprecated public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException { InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile); } @@ -228,7 +248,7 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { * @param zooKeepers * a comma-separated list of zookeeper servers * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead. + * @deprecated since 1.6.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ @Deprecated public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) { @@ -243,7 +263,9 @@ public abstract class AbstractInputFormat<K,V> implements InputFormat<K,V> { * @param clientConfig * client configuration containing connection options * @since 1.6.0 + * @deprecated since 2.0.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ + @Deprecated public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) { InputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java index 5f00ec3..affb535 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloInputFormat.java @@ -19,8 +19,7 @@ package org.apache.accumulo.core.client.mapred; import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -39,10 +38,9 @@ import org.apache.log4j.Level; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} - * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)} + * <li>{@link AccumuloInputFormat#setConnectionInfo(JobConf, ConnectionInfo)} + * <li>{@link AccumuloInputFormat#setInputTableName(JobConf, String)}</li> * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)} - * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} * </ul> * * Other static methods are optional. diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java index 3a2e3fa..3c0b4b6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloMultiTableInputFormat.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Map; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.mapreduce.InputTableConfig; import org.apache.accumulo.core.client.mapreduce.lib.impl.InputConfigurator; import org.apache.accumulo.core.data.Key; @@ -37,7 +38,7 @@ import org.apache.hadoop.mapred.Reporter; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, org.apache.accumulo.core.client.security.tokens.AuthenticationToken)} + * <li>{@link AccumuloInputFormat#setConnectionInfo(JobConf, ConnectionInfo)} * <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)} * <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, org.apache.accumulo.core.security.Authorizations)} * <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java index 6f07ce7..426a4d6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -37,6 +38,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; +import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; @@ -68,9 +70,7 @@ import org.apache.log4j.Logger; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} - * <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, String)} - * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} + * <li>{@link AccumuloOutputFormat#setConnectionInfo(JobConf, ConnectionInfo)} * </ul> * * Other static methods are optional. @@ -81,6 +81,20 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> { protected static final Logger log = Logger.getLogger(CLASS); /** + * Set the connection information needed to communicate with Accumulo in this job. + * + * @param job + * Hadoop job to be configured + * @param info + * Accumulo connection information + * @since 2.0.0 + */ + public static void setConnectionInfo(JobConf job, ConnectionInfo info) throws AccumuloSecurityException { + setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); + setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + } + + /** * Sets the connector information needed to communicate with Accumulo in this job. * * <p> @@ -96,7 +110,9 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> { * @param token * the user's password * @since 1.5.0 + * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ + @Deprecated public static void setConnectorInfo(JobConf job, String principal, AuthenticationToken token) throws AccumuloSecurityException { if (token instanceof KerberosToken) { log.info("Received KerberosToken, attempting to fetch DelegationToken"); @@ -137,7 +153,9 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> { * @param tokenFile * the path to the password file * @since 1.6.0 + * @deprecated since 2.0.0, use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead */ + @Deprecated public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException { OutputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile); } @@ -215,9 +233,8 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> { * @param zooKeepers * a comma-separated list of zookeeper servers * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(JobConf, ClientConfiguration)} instead. + * @deprecated since 1.6.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ - @Deprecated public static void setZooKeeperInstance(JobConf job, String instanceName, String zooKeepers) { setZooKeeperInstance(job, ClientConfiguration.create().withInstance(instanceName).withZkHosts(zooKeepers)); @@ -232,7 +249,9 @@ public class AccumuloOutputFormat implements OutputFormat<Text,Mutation> { * @param clientConfig * client configuration for specifying connection timeouts, SSL connection options, etc. * @since 1.6.0 + * @deprecated since 2.0.0; Use {@link #setConnectionInfo(JobConf, ConnectionInfo)} instead. */ + @Deprecated public static void setZooKeeperInstance(JobConf job, ClientConfiguration clientConfig) { OutputConfigurator.setZooKeeperInstance(CLASS, job, clientConfig); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java index 5049ef7..62b949a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloRowInputFormat.java @@ -19,9 +19,8 @@ package org.apache.accumulo.core.client.mapred; import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -40,10 +39,9 @@ import org.apache.hadoop.mapred.Reporter; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloRowInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)} + * <li>{@link AccumuloRowInputFormat#setConnectionInfo(JobConf, ConnectionInfo)} * <li>{@link AccumuloRowInputFormat#setInputTableName(JobConf, String)} * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(JobConf, Authorizations)} - * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(JobConf, ClientConfiguration)} * </ul> * * Other static methods are optional. diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java index fb36282..0ca7c2d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AbstractInputFormat.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ClientSideIteratorScanner; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; @@ -48,6 +49,7 @@ import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; import org.apache.accumulo.core.client.impl.ClientContext; +import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.Credentials; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.impl.OfflineScanner; @@ -119,6 +121,20 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { } /** + * Sets connection information needed to communicate with Accumulo for this job + * + * @param job + * Hadoop job instance to be configured + * @param info + * Connection information for Accumulo + * @since 2.0.0 + */ + public static void setConnectionInfo(Job job, ConnectionInfo info) throws AccumuloSecurityException { + setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); + setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + } + + /** * Sets the connector information needed to communicate with Accumulo in this job. * * <p> @@ -134,7 +150,9 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @param token * the user's password * @since 1.5.0 + * @deprecated since 2.0.0; use {@link #setConnectionInfo(Job, ConnectionInfo)} instead. */ + @Deprecated public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException { if (token instanceof KerberosToken) { log.info("Received KerberosToken, attempting to fetch DelegationToken"); @@ -253,7 +271,7 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @param zooKeepers * a comma-separated list of zookeeper servers * @since 1.5.0 - * @deprecated since 1.6.0; Use {@link #setZooKeeperInstance(Job, ClientConfiguration)} instead. + * @deprecated since 1.6.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead. */ @Deprecated public static void setZooKeeperInstance(Job job, String instanceName, String zooKeepers) { @@ -269,7 +287,9 @@ public abstract class AbstractInputFormat<K,V> extends InputFormat<K,V> { * @param clientConfig * client configuration containing connection options * @since 1.6.0 + * @deprecated since 2.0.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead. */ + @Deprecated public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) { InputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java index 837b3fe..441ac33 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormat.java @@ -19,8 +19,7 @@ package org.apache.accumulo.core.client.mapreduce; import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -39,9 +38,9 @@ import org.apache.log4j.Level; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)} + * <li>{@link AccumuloInputFormat#setConnectionInfo(Job, ConnectionInfo)} + * <li>{@link AccumuloInputFormat#setInputTableName(Job, String)} * <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)} - * <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} * </ul> * * Other static methods are optional. diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java index e821b5d..af69e5a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -37,6 +38,7 @@ import org.apache.accumulo.core.client.ZooKeeperInstance; import org.apache.accumulo.core.client.admin.DelegationTokenConfig; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.impl.AuthenticationTokenIdentifier; +import org.apache.accumulo.core.client.impl.ConnectionInfoFactory; import org.apache.accumulo.core.client.impl.DelegationTokenImpl; import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase; import org.apache.accumulo.core.client.mapreduce.lib.impl.OutputConfigurator; @@ -69,9 +71,7 @@ import org.apache.log4j.Logger; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, AuthenticationToken)} - * <li>{@link AccumuloOutputFormat#setConnectorInfo(Job, String, String)} - * <li>{@link AccumuloOutputFormat#setZooKeeperInstance(Job, ClientConfiguration)} + * <li>{@link AccumuloOutputFormat#setConnectionInfo(Job, ConnectionInfo)} * </ul> * * Other static methods are optional. @@ -82,6 +82,20 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { protected static final Logger log = Logger.getLogger(CLASS); /** + * Set the connection information needed to communicate with Accumulo in this job. + * + * @param job + * Hadoop job to be configured + * @param info + * Accumulo connection information + * @since 2.0.0 + */ + public static void setConnectionInfo(Job job, ConnectionInfo info) throws AccumuloSecurityException { + setConnectorInfo(job, info.getPrincipal(), info.getAuthenticationToken()); + setZooKeeperInstance(job, ConnectionInfoFactory.getClientConfiguration(info)); + } + + /** * Sets the connector information needed to communicate with Accumulo in this job. * * <p> @@ -97,7 +111,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @param token * the user's password * @since 1.5.0 + * @deprecated since 2.0.0, replaced by {@link #setConnectionInfo(Job, ConnectionInfo)} */ + @Deprecated public static void setConnectorInfo(Job job, String principal, AuthenticationToken token) throws AccumuloSecurityException { if (token instanceof KerberosToken) { log.info("Received KerberosToken, attempting to fetch DelegationToken"); @@ -138,7 +154,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @param tokenFile * the path to the token file * @since 1.6.0 + * @deprecated since 2.0.0, replaced by {@link #setConnectionInfo(Job, ConnectionInfo)} */ + @Deprecated public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException { OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile); } @@ -232,7 +250,9 @@ public class AccumuloOutputFormat extends OutputFormat<Text,Mutation> { * @param clientConfig * client configuration for specifying connection timeouts, SSL connection options, etc. * @since 1.6.0 + * @deprecated since 2.0.0; Use {@link #setConnectionInfo(Job, ConnectionInfo)} instead. */ + @Deprecated public static void setZooKeeperInstance(Job job, ClientConfiguration clientConfig) { OutputConfigurator.setZooKeeperInstance(CLASS, job.getConfiguration(), clientConfig); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java index 043f88a..4f6b2bd 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormat.java @@ -19,9 +19,8 @@ package org.apache.accumulo.core.client.mapreduce; import java.io.IOException; import java.util.Map.Entry; -import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.RowIterator; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; @@ -40,10 +39,9 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; * The user must specify the following via static configurator methods: * * <ul> - * <li>{@link AccumuloRowInputFormat#setConnectorInfo(Job, String, AuthenticationToken)} + * <li>{@link AccumuloRowInputFormat#setConnectionInfo(Job, ConnectionInfo)} * <li>{@link AccumuloRowInputFormat#setInputTableName(Job, String)} * <li>{@link AccumuloRowInputFormat#setScanAuthorizations(Job, Authorizations)} - * <li>{@link AccumuloRowInputFormat#setZooKeeperInstance(Job, ClientConfiguration)} * </ul> * * Other static methods are optional. diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java index 9b5601b..4918125 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java @@ -85,6 +85,11 @@ public class MockConnector extends Connector { config.getMaxWriteThreads()); } + @Override + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads) throws TableNotFoundException { + return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig()); + } + @Deprecated @Override public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException { @@ -98,6 +103,11 @@ public class MockConnector extends Connector { return createBatchWriter(tableName, config.getMaxMemory(), config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads()); } + @Override + public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException { + return createBatchWriter(tableName, new BatchWriterConfig()); + } + @Deprecated @Override public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) { @@ -110,6 +120,11 @@ public class MockConnector extends Connector { } @Override + public MultiTableBatchWriter createMultiTableBatchWriter() { + return createMultiTableBatchWriter(new BatchWriterConfig()); + } + + @Override public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { MockTable table = acu.tables.get(tableName); if (table == null) diff --git a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java index 5ac6f02..b39afe2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java +++ b/core/src/main/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderToken.java @@ -33,6 +33,9 @@ import org.apache.hadoop.conf.Configuration; public class CredentialProviderToken extends PasswordToken { public static final String NAME_PROPERTY = "name", CREDENTIAL_PROVIDERS_PROPERTY = "credentialProviders"; + private String name; + private String credentialProviders; + public CredentialProviderToken() { super(); } @@ -40,11 +43,12 @@ public class CredentialProviderToken extends PasswordToken { public CredentialProviderToken(String name, String credentialProviders) throws IOException { requireNonNull(name); requireNonNull(credentialProviders); - setWithCredentialProviders(name, credentialProviders); } protected void setWithCredentialProviders(String name, String credentialProviders) throws IOException { + this.name = name; + this.credentialProviders = credentialProviders; final Configuration conf = new Configuration(CachedConfiguration.getInstance()); conf.set(CredentialProviderFactoryShim.CREDENTIAL_PROVIDER_PATH, credentialProviders); @@ -57,6 +61,20 @@ public class CredentialProviderToken extends PasswordToken { setPassword(CharBuffer.wrap(password)); } + /** + * @return Name used to extract Accumulo user password from CredentialProvider + */ + public String getName() { + return name; + } + + /** + * @return CSV list of CredentialProvider(s) + */ + public String getCredentialProviders() { + return credentialProviders; + } + @Override public void init(Properties properties) { char[] nameCharArray = properties.get(NAME_PROPERTY), credentialProvidersCharArray = properties.get(CREDENTIAL_PROVIDERS_PROPERTY); diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java new file mode 100644 index 0000000..8218823 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.conf; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.FileNotFoundException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; + +import com.google.common.collect.Sets; + +/** + * Generates client-properties.md for documentation on Accumulo website and accumulo-client.properties for Accumulo distribution tarball + */ +class ClientConfigGenerate { + + private abstract class Format { + + abstract void beginSection(String section); + + abstract void pageHeader(); + + abstract void property(ClientProperty prop); + + void generate() { + pageHeader(); + + generateSection("Instance", "instance."); + generateSection("Authentication", "auth.", "auth.method", "auth.username"); + generateSection("Batch Writer", "batch.writer."); + generateSection("SSL", "ssl."); + generateSection("SASL", "sasl."); + generateSection("Tracing", "trace."); + + doc.close(); + } + + void generateSection(String section, String prefix, String... prefixProps) { + beginSection(section); + for (String prop : prefixProps) { + ClientProperty cp = sortedProps.get(prop); + if (cp != null) { + property(cp); + } + } + Set<String> prefixSet = Sets.newHashSet(prefixProps); + for (ClientProperty prop : sortedProps.values()) { + if (prop.getKey().startsWith(prefix) && !prefixSet.contains(prop.getKey())) { + property(prop); + } + } + } + + void generateSection(String section, String prefix) { + generateSection(section, prefix, ""); + } + } + + private class Markdown extends Format { + + @Override + void beginSection(String section) {} + + @Override + void pageHeader() { + doc.println("---"); + doc.println("title: Client Properties"); + doc.println("category: development"); + doc.println("order: 9"); + doc.println("---\n"); + doc.println("<!-- WARNING: Do not edit this file. It is a generated file that is copied from Accumulo build (from core/target/generated-docs) -->"); + doc.println("<!-- Generated by : " + getClass().getName() + " -->\n"); + doc.println("Below are properties set in `accumulo-client.properties` that configure Accumulo clients. All properties have been part of the API since 2.0.0 (unless otherwise specified):\n"); + doc.println("| Property | Default value | Since | Description |"); + doc.println("|----------|---------------|-------|-------------|"); + } + + @Override + void property(ClientProperty prop) { + Objects.nonNull(prop); + doc.print("| <a name=\"" + prop.getKey().replace(".", "_") + "\" class=\"prop\"></a> " + prop.getKey() + " | "); + String defaultValue = sanitize(prop.getDefaultValue()).trim(); + if (defaultValue.length() == 0) { + defaultValue = "*empty*"; + } + doc.println(defaultValue + " | " + prop.getSince() + " | " + sanitize(prop.getDescription() + " |")); + } + + String sanitize(String str) { + return str.replace("\n", "<br>"); + } + } + + private class ConfigFile extends Format { + + @Override + void beginSection(String section) { + doc.println("\n## " + section + " properties"); + doc.println("## --------------"); + } + + @Override + void pageHeader() { + doc.println("# Licensed to the Apache Software Foundation (ASF) under one or more"); + doc.println("# contributor license agreements. See the NOTICE file distributed with"); + doc.println("# this work for additional information regarding copyright ownership."); + doc.println("# The ASF licenses this file to You under the Apache License, Version 2.0"); + doc.println("# (the \"License\"); you may not use this file except in compliance with"); + doc.println("# the License. You may obtain a copy of the License at"); + doc.println("#"); + doc.println("# http://www.apache.org/licenses/LICENSE-2.0"); + doc.println("#"); + doc.println("# Unless required by applicable law or agreed to in writing, software"); + doc.println("# distributed under the License is distributed on an \"AS IS\" BASIS,"); + doc.println("# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied."); + doc.println("# See the License for the specific language governing permissions and"); + doc.println("# limitations under the License.\n"); + doc.println("################################"); + doc.println("## Accumulo client configuration"); + doc.println("################################\n"); + doc.println("## NOTE - All properties that have a default are set with it. Properties that"); + doc.println("## are uncommented must be set by the user."); + } + + @Override + void property(ClientProperty prop) { + doc.println("## " + prop.getDescription()); + if (!prop.isRequired()) { + doc.print("#"); + } + doc.println(prop.getKey() + "=" + prop.getDefaultValue() + "\n"); + } + } + + private PrintStream doc; + private final TreeMap<String,ClientProperty> sortedProps = new TreeMap<>(); + + private ClientConfigGenerate(PrintStream doc) { + Objects.nonNull(doc); + this.doc = doc; + for (ClientProperty prop : ClientProperty.values()) { + this.sortedProps.put(prop.getKey(), prop); + } + } + + private void generateMarkdown() { + new Markdown().generate(); + } + + private void generateConfigFile() { + new ConfigFile().generate(); + } + + /** + * Generates markdown and config files for Accumulo client properties. Arguments are: "--generate-markdown filename" or "--generate-config filename" + * + * @param args + * command-line arguments + * @throws IllegalArgumentException + * if args is invalid + */ + public static void main(String[] args) throws FileNotFoundException, UnsupportedEncodingException { + if (args.length == 2) { + ClientConfigGenerate clientConfigGenerate = new ClientConfigGenerate(new PrintStream(args[1], UTF_8.name())); + if (args[0].equals("--generate-markdown")) { + clientConfigGenerate.generateMarkdown(); + return; + } else if (args[0].equals("--generate-config")) { + clientConfigGenerate.generateConfigFile(); + return; + } + } + throw new IllegalArgumentException("Usage: " + ClientConfigGenerate.class.getName() + " [--generate-markdown|--generate-config] <filename>"); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java new file mode 100644 index 0000000..b645b10 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.conf; + +import java.util.Objects; +import java.util.Properties; + +import org.apache.accumulo.core.Constants; + +public enum ClientProperty { + + // Instance + INSTANCE_NAME("instance.name", "", "Name of Accumulo instance to connect to", "", true), + INSTANCE_ZOOKEEPERS("instance.zookeepers", "localhost:2181", "Zookeeper connection information for Accumulo instance", "", true), + INSTANCE_ZOOKEEPERS_TIMEOUT_SEC("instance.zookeepers.timeout.sec", "30", "Zookeeper session timeout (in seconds)"), + + // Authentication + AUTH_METHOD("auth.method", "password", "Authentication method (i.e password, kerberos, provider). Set more properties for chosen method below.", "", true), + AUTH_USERNAME("auth.username", "", "Accumulo username/principal for chosen authentication method", "", true), + AUTH_PASSWORD("auth.password", "", "Accumulo user password", "", true), + AUTH_KERBEROS_KEYTAB_PATH("auth.kerberos.keytab.path", "", "Path to Kerberos keytab"), + AUTH_PROVIDER_NAME("auth.provider.name", "", "Alias used to extract Accumulo user password from CredentialProvider"), + AUTH_PROVIDER_URLS("auth.provider.urls", "", "Comma separated list of URLs defining CredentialProvider(s)"), + + // BatchWriter + BATCH_WRITER_MAX_MEMORY_BYTES("batch.writer.max.memory.bytes", "52428800", "Max memory (in bytes) to batch before writing"), + BATCH_WRITER_MAX_LATENCY_SEC("batch.writer.max.latency.sec", "120", "Max amount of time (in seconds) to hold data in memory before flushing it"), + BATCH_WRITER_MAX_TIMEOUT_SEC("batch.writer.max.timeout.sec", "0", + "Max amount of time (in seconds) an unresponsive server will be re-tried. An exception is thrown when this timeout is exceeded. Set to zero for no timeout."), + BATCH_WRITER_MAX_WRITE_THREADS("batch.writer.max.write.threads", "3", "Maximum number of threads to use for writing data to tablet servers."), + BATCH_WRITER_DURABILITY("batch.writer.durability", "default", + "Change the durability for the BatchWriter session. To use the table's durability setting. use \"default\" which is the table's durability setting."), + + // SSL + SSL_ENABLED("ssl.enabled", "false", "Enable SSL for client RPC"), + SSL_KEYSTORE_PASSWORD("ssl.keystore.password", "", "Password used to encrypt keystore"), + SSL_KEYSTORE_PATH("ssl.keystore.path", "", "Path to SSL keystore file"), + SSL_KEYSTORE_TYPE("ssl.keystore.type", "jks", "Type of SSL keystore"), + SSL_TRUSTSTORE_PASSWORD("ssl.truststore.password", "", "Password used to encrypt truststore"), + SSL_TRUSTSTORE_PATH("ssl.truststore.path", "", "Path to SSL truststore file"), + SSL_TRUSTSTORE_TYPE("ssl.truststore.type", "jks", "Type of SSL truststore"), + SSL_USE_JSSE("ssl.use.jsse", "false", "Use JSSE system properties to configure SSL"), + + // SASL + SASL_ENABLED("sasl.enabled", "false", "Enable SASL for client RPC"), + SASL_QOP("sasl.qop", "auth", "SASL quality of protection. Valid values are 'auth', 'auth-int', and 'auth-conf'"), + SASL_KERBEROS_SERVER_PRIMARY("sasl.kerberos.server.primary", "accumulo", "Kerberos principal/primary that Accumulo servers use to login"), + + // Trace + TRACE_SPAN_RECEIVERS("trace.span.receivers", "org.apache.accumulo.tracer.ZooTraceClient", "A list of span receiver classes to send trace spans"), + TRACE_ZOOKEEPER_PATH("trace.zookeeper.path", Constants.ZTRACERS, "The zookeeper node where tracers are registered"); + + private String key; + private String defaultValue; + private String description; + private String since; + private boolean required; + + ClientProperty(String key, String defaultValue, String description, String since, boolean required) { + Objects.requireNonNull(key); + Objects.requireNonNull(defaultValue); + Objects.requireNonNull(description); + Objects.requireNonNull(since); + this.key = key; + this.defaultValue = defaultValue; + this.description = description; + this.since = since; + this.required = required; + } + + ClientProperty(String key, String defaultValue, String description, String since) { + this(key, defaultValue, description, since, false); + } + + ClientProperty(String key, String defaultValue, String description) { + this(key, defaultValue, description, ""); + } + + public String getKey() { + return key; + } + + public String getDefaultValue() { + return defaultValue; + } + + public String getDescription() { + return description; + } + + public String getSince() { + return since; + } + + public boolean isRequired() { + return required; + } + + public String getValue(Properties properties) { + Objects.requireNonNull(properties); + String value = properties.getProperty(getKey()); + if (value == null || value.isEmpty()) { + value = getDefaultValue(); + } + Objects.requireNonNull(value); + if (isRequired() && value.isEmpty()) { + throw new IllegalArgumentException(getKey() + " must be set!"); + } + return value; + } + + public Long getLong(Properties properties) { + String value = getValue(properties); + if (value.isEmpty()) { + return null; + } + return Long.parseLong(value); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java index 61245f8..91df2dc 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationDocGen.java @@ -80,11 +80,12 @@ class ConfigurationDocGen { @Override void pageHeader() { doc.println("---"); - doc.println("title: Configuration Properties"); + doc.println("title: Server Properties"); doc.println("category: administration"); doc.println("order: 3"); doc.println("---\n"); doc.println("<!-- WARNING: Do not edit this file. It is a generated file that is copied from Accumulo build (from core/target/generated-docs) -->\n"); + doc.println("Below are properties set in `accumulo-site.xml` or the Accumulo shell that configure Accumulo servers (i.e tablet server, master, etc):\n"); } @Override diff --git a/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java b/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java index 9a32c26..786a7bd 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java @@ -27,6 +27,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.concurrent.TimeUnit; +import org.junit.Assert; import org.junit.Test; /** @@ -201,6 +202,24 @@ public class BatchWriterConfigTest { assertEquals(cfg1.hashCode(), cfg2.hashCode()); } + @Test + public void testMerge() { + BatchWriterConfig cfg1 = new BatchWriterConfig(), cfg2 = new BatchWriterConfig(); + cfg1.setMaxMemory(1234); + cfg2.setMaxMemory(5858); + cfg2.setDurability(Durability.LOG); + cfg2.setMaxLatency(456, TimeUnit.MILLISECONDS); + + Assert.assertEquals(Durability.DEFAULT, cfg1.getDurability()); + + BatchWriterConfig merged = cfg1.merge(cfg2); + + Assert.assertEquals(1234, merged.getMaxMemory()); + Assert.assertEquals(Durability.LOG, merged.getDurability()); + Assert.assertEquals(456, merged.getMaxLatency(TimeUnit.MILLISECONDS)); + Assert.assertEquals(3, merged.getMaxWriteThreads()); + } + private byte[] createBytes(BatchWriterConfig bwConfig) throws IOException { ByteArrayOutputStream baos = new ByteArrayOutputStream(); bwConfig.write(new DataOutputStream(baos)); diff --git a/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java b/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java index d6c7025..eb562d9 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/security/tokens/CredentialProviderTokenTest.java @@ -56,6 +56,8 @@ public class CredentialProviderTokenTest { } CredentialProviderToken token = new CredentialProviderToken("root.password", keystorePath); + Assert.assertEquals("root.password", token.getName()); + Assert.assertEquals(keystorePath, token.getCredentialProviders()); Assert.assertArrayEquals("password".getBytes(UTF_8), token.getPassword()); token = new CredentialProviderToken("bob.password", keystorePath); diff --git a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java index e278fce..fdb8cfa 100644 --- a/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java +++ b/test/src/main/java/org/apache/accumulo/harness/AccumuloClusterHarness.java @@ -28,6 +28,7 @@ import org.apache.accumulo.cluster.ClusterUser; import org.apache.accumulo.cluster.ClusterUsers; import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; @@ -251,6 +252,11 @@ public abstract class AccumuloClusterHarness extends AccumuloITBase implements M return clusterConf.getAdminPrincipal(); } + public static ConnectionInfo getConnectionInfo() { + return Connector.builder().forInstance(getCluster().getInstanceName(), getCluster().getZooKeepers()).usingToken(getAdminPrincipal(), getAdminToken()) + .info(); + } + public static AuthenticationToken getAdminToken() { checkState(initialized); return clusterConf.getAdminToken(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java index 999c1e9..c243f76 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConfigurableMacBase.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.ZooKeeperInstance; @@ -172,6 +173,10 @@ public class ConfigurableMacBase extends AccumuloITBase { return getCluster().getConnector("root", new PasswordToken(ROOT_PASSWORD)); } + protected ConnectionInfo getConnectionInfo() { + return Connector.builder().forInstance(getCluster().getInstanceName(), getCluster().getZooKeepers()).usingPassword("root", ROOT_PASSWORD).info(); + } + protected Process exec(Class<?> clazz, String... args) throws IOException { return getCluster().exec(clazz, args); } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java new file mode 100644 index 0000000..5846ec0 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/ConnectorIT.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.Properties; + +import org.apache.accumulo.core.client.ConnectionInfo; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.junit.Assert; +import org.junit.Test; + +public class ConnectorIT extends AccumuloClusterHarness { + + @Test + public void testConnectorBuilder() throws Exception { + Connector c = getConnector(); + String instanceName = c.getInstance().getInstanceName(); + String zookeepers = c.getInstance().getZooKeepers(); + final String user = "testuser"; + final String password = "testpassword"; + c.securityOperations().createLocalUser(user, new PasswordToken(password)); + + Connector conn = Connector.builder().forInstance(instanceName, zookeepers).usingPassword(user, password).build(); + + Assert.assertEquals(instanceName, conn.getInstance().getInstanceName()); + Assert.assertEquals(zookeepers, conn.getInstance().getZooKeepers()); + Assert.assertEquals(user, conn.whoami()); + + ConnectionInfo info = Connector.builder().forInstance(instanceName, zookeepers).usingPassword(user, password).info(); + Assert.assertEquals(instanceName, info.getInstanceName()); + Assert.assertEquals(zookeepers, info.getZookeepers()); + Assert.assertEquals(user, info.getPrincipal()); + Assert.assertTrue(info.getAuthenticationToken() instanceof PasswordToken); + + Properties props = new Properties(); + props.put(ClientProperty.INSTANCE_NAME.getKey(), instanceName); + props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers); + props.put(ClientProperty.AUTH_USERNAME.getKey(), user); + props.put(ClientProperty.AUTH_PASSWORD.getKey(), password); + conn = Connector.builder().usingProperties(props).build(); + + Assert.assertEquals(instanceName, conn.getInstance().getInstanceName()); + Assert.assertEquals(zookeepers, conn.getInstance().getZooKeepers()); + Assert.assertEquals(user, conn.whoami()); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java index cf002dd..c3047ed 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java @@ -122,9 +122,8 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness { job.setInputFormat(AccumuloInputFormat.class); - AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table); - AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); AccumuloInputFormat.setBatchScan(job, batchScan); if (sample) { AccumuloInputFormat.setSamplerConfiguration(job, SAMPLER_CONFIG); @@ -215,10 +214,9 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness { Connector connector = getConnector(); connector.tableOperations().create(table); - AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table); AccumuloInputFormat.setScanAuthorizations(job, auths); - AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); AccumuloInputFormat.setScanIsolation(job, isolated); AccumuloInputFormat.setLocalIterators(job, localIters); AccumuloInputFormat.fetchColumns(job, fetchColumns); diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java index eb12f1c..a2f3918 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloOutputFormatIT.java @@ -31,13 +31,12 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ConnectionInfo; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mapred.AccumuloInputFormat; import org.apache.accumulo.core.client.mapred.AccumuloOutputFormat; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; @@ -83,9 +82,8 @@ public class AccumuloOutputFormatIT extends ConfigurableMacBase { // set the max memory so that we ensure we don't flush on the write. batchConfig.setMaxMemory(Long.MAX_VALUE); AccumuloOutputFormat outputFormat = new AccumuloOutputFormat(); + AccumuloOutputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloOutputFormat.setBatchWriterOptions(job, batchConfig); - AccumuloOutputFormat.setZooKeeperInstance(job, cluster.getClientConfig()); - AccumuloOutputFormat.setConnectorInfo(job, "root", new PasswordToken(ROOT_PASSWORD)); RecordWriter<Text,Mutation> writer = outputFormat.getRecordWriter(null, job, "Test", null); try { @@ -122,7 +120,7 @@ public class AccumuloOutputFormatIT extends ConfigurableMacBase { OutputCollector<Text,Mutation> finalOutput; @Override - public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) throws IOException { + public void map(Key k, Value v, OutputCollector<Text,Mutation> output, Reporter reporter) { finalOutput = output; try { if (key != null) @@ -167,11 +165,10 @@ public class AccumuloOutputFormatIT extends ConfigurableMacBase { job.setInputFormat(AccumuloInputFormat.class); - ClientConfiguration clientConfig = ClientConfiguration.create().withInstance(instanceName).withZkHosts(zooKeepers); + ConnectionInfo info = Connector.builder().forInstance(instanceName, zooKeepers).usingPassword(user, pass).info(); - AccumuloInputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); + AccumuloInputFormat.setConnectionInfo(job, info); AccumuloInputFormat.setInputTableName(job, table1); - AccumuloInputFormat.setZooKeeperInstance(job, clientConfig); job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Key.class); @@ -180,10 +177,9 @@ public class AccumuloOutputFormatIT extends ConfigurableMacBase { job.setOutputKeyClass(Text.class); job.setOutputValueClass(Mutation.class); - AccumuloOutputFormat.setConnectorInfo(job, user, new PasswordToken(pass)); + AccumuloOutputFormat.setConnectionInfo(job, info); AccumuloOutputFormat.setCreateTables(job, false); AccumuloOutputFormat.setDefaultTableName(job, table2); - AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig); job.setNumReduceTasks(0); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java index 7d44833..2dad20e 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java @@ -298,8 +298,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness { job.setInputFormatClass(inputFormatClass); - AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig()); - AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table); AccumuloInputFormat.setBatchScan(job, batchScan); if (sample) { @@ -409,9 +408,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness { Connector connector = getConnector(); connector.tableOperations().create(table); - AccumuloInputFormat.setZooKeeperInstance(job, cluster.getClientConfig()); - AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken()); - + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table); AccumuloInputFormat.setScanAuthorizations(job, auths); AccumuloInputFormat.setScanIsolation(job, isolated); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java index ff57722..31dd458 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloOutputFormatIT.java @@ -94,9 +94,8 @@ public class AccumuloOutputFormatIT extends AccumuloClusterHarness { job.setInputFormatClass(AccumuloInputFormat.class); - AccumuloInputFormat.setConnectorInfo(job, user, pass); + AccumuloInputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloInputFormat.setInputTableName(job, table1); - AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); job.setMapperClass(TestMapper.class); job.setMapOutputKeyClass(Key.class); @@ -105,10 +104,9 @@ public class AccumuloOutputFormatIT extends AccumuloClusterHarness { job.setOutputKeyClass(Text.class); job.setOutputValueClass(Mutation.class); - AccumuloOutputFormat.setConnectorInfo(job, user, pass); + AccumuloOutputFormat.setConnectionInfo(job, getConnectionInfo()); AccumuloOutputFormat.setCreateTables(job, false); AccumuloOutputFormat.setDefaultTableName(job, table2); - AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); job.setNumReduceTasks(0); -- To stop receiving notification emails like this one, please contact mwa...@apache.org.