http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java new file mode 100644 index 0000000..4ebb3e2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/JarScan.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.fn.registry; + +import org.apache.drill.common.scanner.persistence.ScanResult; + +/** + * Holder class that contains: + * <ol> + * <li>jar name</li> + * <li>scan of packages, classes, annotations found in jar</li> + * <li>unique jar classLoader</li> + * </ol> + */ +public class JarScan { + + private final String jarName; + private final ScanResult scanResult; + private final ClassLoader classLoader; + + public JarScan(String jarName, ScanResult scanResult, ClassLoader classLoader) { + this.jarName = jarName; + this.scanResult = scanResult; + this.classLoader = classLoader; + } + + public String getJarName() { + return jarName; + } + + public ClassLoader getClassLoader() { + return classLoader; + } + + public ScanResult getScanResult() { + return scanResult; + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java new file mode 100644 index 0000000..03fd608 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.fn.registry; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.calcite.sql.SqlOperator; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.common.scanner.persistence.AnnotatedClassDescriptor; +import org.apache.drill.common.scanner.persistence.ScanResult; +import org.apache.drill.exec.exception.FunctionValidationException; +import org.apache.drill.exec.exception.JarValidationException; +import org.apache.drill.exec.expr.fn.DrillFuncHolder; +import org.apache.drill.exec.expr.fn.FunctionConverter; +import org.apache.drill.exec.planner.logical.DrillConstExecutor; +import org.apache.drill.exec.planner.sql.DrillOperatorTable; +import org.apache.drill.exec.planner.sql.DrillSqlAggOperator; +import org.apache.drill.exec.planner.sql.DrillSqlAggOperatorWithoutInference; +import org.apache.drill.exec.planner.sql.DrillSqlOperator; + +import com.google.common.collect.ArrayListMultimap; +import org.apache.drill.exec.planner.sql.DrillSqlOperatorWithoutInference; + +/** + * Registry of Drill functions. + */ +public class LocalFunctionRegistry { + + public static final String BUILT_IN = "built-in"; + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalFunctionRegistry.class); + private static final String functionSignaturePattern = "%s(%s)"; + + private static final ImmutableMap<String, Pair<Integer, Integer>> registeredFuncNameToArgRange = ImmutableMap.<String, Pair<Integer, Integer>> builder() + // CONCAT is allowed to take [1, infinity) number of arguments. + // Currently, this flexibility is offered by DrillOptiq to rewrite it as + // a nested structure + .put("CONCAT", Pair.of(1, Integer.MAX_VALUE)) + + // When LENGTH is given two arguments, this function relies on DrillOptiq to rewrite it as + // another function based on the second argument (encodingType) + .put("LENGTH", Pair.of(1, 2)) + + // Dummy functions + .put("CONVERT_TO", Pair.of(2, 2)) + .put("CONVERT_FROM", Pair.of(2, 2)) + .put("FLATTEN", Pair.of(1, 1)).build(); + + private final FunctionRegistryHolder registryHolder; + + /** Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in. + * Built-in functions are not allowed to be unregistered. */ + public LocalFunctionRegistry(ScanResult classpathScan) { + registryHolder = new FunctionRegistryHolder(); + validate(BUILT_IN, classpathScan); + register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader()))); + if (logger.isTraceEnabled()) { + StringBuilder allFunctions = new StringBuilder(); + for (DrillFuncHolder method: registryHolder.getAllFunctionsWithHolders().values()) { + allFunctions.append(method.toString()).append("\n"); + } + logger.trace("Registered functions: [\n{}]", allFunctions); + } + } + + /** + * @return local function registry version number + */ + public long getVersion() { + return registryHolder.getVersion(); + } + + /** + * Validates all functions, present in jars. + * Will throw {@link FunctionValidationException} if: + * <ol> + * <li>Jar with the same name has been already registered.</li> + * <li>Conflicting function with the similar signature is found.</li> + * <li>Aggregating function is not deterministic.</li> + *</ol> + * @param jarName jar name to be validated + * @param scanResult scan of all classes present in jar + * @return list of validated function signatures + */ + public List<String> validate(String jarName, ScanResult scanResult) { + List<String> functions = Lists.newArrayList(); + FunctionConverter converter = new FunctionConverter(); + List<AnnotatedClassDescriptor> providerClasses = scanResult.getAnnotatedClasses(); + + if (registryHolder.containsJar(jarName)) { + throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName)); + } + + final ListMultimap<String, String> allFuncWithSignatures = registryHolder.getAllFunctionsWithSignatures(); + + for (AnnotatedClassDescriptor func : providerClasses) { + DrillFuncHolder holder = converter.getHolder(func, ClassLoader.getSystemClassLoader()); + if (holder != null) { + String functionInput = holder.getInputParameters(); + + String[] names = holder.getRegisteredNames(); + for (String name : names) { + String functionName = name.toLowerCase(); + String functionSignature = String.format(functionSignaturePattern, functionName, functionInput); + + if (allFuncWithSignatures.get(functionName).contains(functionSignature)) { + throw new FunctionValidationException(String.format("Found duplicated function in %s: %s", + registryHolder.getJarNameByFunctionSignature(functionName, functionSignature), functionSignature)); + } else if (holder.isAggregating() && !holder.isDeterministic()) { + throw new FunctionValidationException( + String.format("Aggregate functions must be deterministic: %s", func.getClassName())); + } else { + functions.add(functionSignature); + allFuncWithSignatures.put(functionName, functionSignature); + } + } + } else { + logger.warn("Unable to initialize function for class {}", func.getClassName()); + } + } + return functions; + } + + /** + * Registers all functions present in jar. + * If jar name is already registered, all jar related functions will be overridden. + * To prevent classpath collisions during loading and unloading jars, + * each jar is shipped with its own class loader. + * + * @param jars list of jars to be registered + */ + public void register(List<JarScan> jars) { + Map<String, List<FunctionHolder>> newJars = Maps.newHashMap(); + for (JarScan jarScan : jars) { + FunctionConverter converter = new FunctionConverter(); + List<AnnotatedClassDescriptor> providerClasses = jarScan.getScanResult().getAnnotatedClasses(); + List<FunctionHolder> functions = Lists.newArrayList(); + newJars.put(jarScan.getJarName(), functions); + for (AnnotatedClassDescriptor func : providerClasses) { + DrillFuncHolder holder = converter.getHolder(func, jarScan.getClassLoader()); + if (holder != null) { + String functionInput = holder.getInputParameters(); + String[] names = holder.getRegisteredNames(); + for (String name : names) { + String functionName = name.toLowerCase(); + String functionSignature = String.format(functionSignaturePattern, functionName, functionInput); + functions.add(new FunctionHolder(functionName, functionSignature, holder)); + } + } + } + } + registryHolder.addJars(newJars); + } + + /** + * Removes all function associated with the given jar name. + * Functions marked as built-in is not allowed to be unregistered. + * If user attempts to unregister built-in functions, logs warning and does nothing. + * Jar name is case-sensitive. + * + * @param jarName jar name to be unregistered + */ + public void unregister(String jarName) { + if (BUILT_IN.equals(jarName)) { + logger.warn("Functions marked as built-in are not allowed to be unregistered."); + return; + } + registryHolder.removeJar(jarName); + } + + /** + * Returns list of jar names registered in function registry. + * + * @return list of jar names + */ + public List<String> getAllJarNames() { + return registryHolder.getAllJarNames(); + } + + /** + * @return quantity of all registered functions + */ + public int size(){ + return registryHolder.functionsSize(); + } + + /** + * @param name function name + * @return all function holders associated with the function name. Function name is case insensitive. + */ + public List<DrillFuncHolder> getMethods(String name, AtomicLong version) { + return registryHolder.getHoldersByFunctionName(name.toLowerCase(), version); + } + + public List<DrillFuncHolder> getMethods(String name) { + return registryHolder.getHoldersByFunctionName(name.toLowerCase()); + } + + /** + * Registers all functions present in {@link DrillOperatorTable}, + * also sets local registry version used at the moment of registering. + * + * @param operatorTable drill operator table + */ + public void register(DrillOperatorTable operatorTable) { + AtomicLong versionHolder = new AtomicLong(); + final Map<String, Collection<DrillFuncHolder>> registeredFunctions = registryHolder.getAllFunctionsWithHolders(versionHolder).asMap(); + operatorTable.setFunctionRegistryVersion(versionHolder.get()); + registerOperatorsWithInference(operatorTable, registeredFunctions); + registerOperatorsWithoutInference(operatorTable, registeredFunctions); + } + + private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) { + final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = Maps.newHashMap(); + final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = Maps.newHashMap(); + for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) { + final ArrayListMultimap<Pair<Integer, Integer>, DrillFuncHolder> functions = ArrayListMultimap.create(); + final ArrayListMultimap<Integer, DrillFuncHolder> aggregateFunctions = ArrayListMultimap.create(); + final String name = function.getKey().toUpperCase(); + boolean isDeterministic = true; + for (DrillFuncHolder func : function.getValue()) { + final int paramCount = func.getParamCount(); + if(func.isAggregating()) { + aggregateFunctions.put(paramCount, func); + } else { + final Pair<Integer, Integer> argNumberRange; + if(registeredFuncNameToArgRange.containsKey(name)) { + argNumberRange = registeredFuncNameToArgRange.get(name); + } else { + argNumberRange = Pair.of(func.getParamCount(), func.getParamCount()); + } + functions.put(argNumberRange, func); + } + + if(!func.isDeterministic()) { + isDeterministic = false; + } + } + for (Entry<Pair<Integer, Integer>, Collection<DrillFuncHolder>> entry : functions.asMap().entrySet()) { + final Pair<Integer, Integer> range = entry.getKey(); + final int max = range.getRight(); + final int min = range.getLeft(); + if(!map.containsKey(name)) { + map.put(name, new DrillSqlOperator.DrillSqlOperatorBuilder() + .setName(name)); + } + + final DrillSqlOperator.DrillSqlOperatorBuilder drillSqlOperatorBuilder = map.get(name); + drillSqlOperatorBuilder + .addFunctions(entry.getValue()) + .setArgumentCount(min, max) + .setDeterministic(isDeterministic); + } + for (Entry<Integer, Collection<DrillFuncHolder>> entry : aggregateFunctions.asMap().entrySet()) { + if(!mapAgg.containsKey(name)) { + mapAgg.put(name, new DrillSqlAggOperator.DrillSqlAggOperatorBuilder().setName(name)); + } + + final DrillSqlAggOperator.DrillSqlAggOperatorBuilder drillSqlAggOperatorBuilder = mapAgg.get(name); + drillSqlAggOperatorBuilder + .addFunctions(entry.getValue()) + .setArgumentCount(entry.getKey(), entry.getKey()); + } + } + + for(final Entry<String, DrillSqlOperator.DrillSqlOperatorBuilder> entry : map.entrySet()) { + operatorTable.addOperatorWithInference( + entry.getKey(), + entry.getValue().build()); + } + + for(final Entry<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> entry : mapAgg.entrySet()) { + operatorTable.addOperatorWithInference( + entry.getKey(), + entry.getValue().build()); + } + } + + private void registerOperatorsWithoutInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) { + SqlOperator op; + for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) { + Set<Integer> argCounts = Sets.newHashSet(); + String name = function.getKey().toUpperCase(); + for (DrillFuncHolder func : function.getValue()) { + if (argCounts.add(func.getParamCount())) { + if (func.isAggregating()) { + op = new DrillSqlAggOperatorWithoutInference(name, func.getParamCount()); + } else { + boolean isDeterministic; + // prevent Drill from folding constant functions with types that cannot be materialized + // into literals + if (DrillConstExecutor.NON_REDUCIBLE_TYPES.contains(func.getReturnType().getMinorType())) { + isDeterministic = false; + } else { + isDeterministic = func.isDeterministic(); + } + op = new DrillSqlOperatorWithoutInference(name, func.getParamCount(), func.getReturnType(), isDeterministic); + } + operatorTable.addOperatorWithoutInference(function.getKey(), op); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java new file mode 100644 index 0000000..4ce4a19 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.fn.registry; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.store.TransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreListener; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.exception.VersionMismatchException; +import org.apache.drill.exec.proto.SchemaUserBitShared; +import org.apache.drill.exec.proto.UserBitShared.Registry; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.File; +import java.io.IOException; + +import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT; + +/** + * Is responsible for remote function registry management. + * Creates all remote registry areas at startup and validates them, + * during init establishes connections with three udf related stores. + * Provides tools to work with three udf related stores, gives access to remote registry areas. + * + * There are three udf stores: + * REGISTRY - persistent store, stores remote function registry {@link Registry} under udf path + * which contains information about all dynamically registered jars and their function signatures. + * If connection is created for the first time, puts empty remote registry. + * + * UNREGISTRATION - transient store, stores information under udf/unregister path. + * udf/unregister path is persistent by itself but any child created will be transient. + * Whenever user submits request to unregister jar, child path with jar name is created under this store. + * This store also holds unregistration listener, which notifies all drill bits when child path is created, + * so they can start local unregistration process. + * + * JARS - transient store, stores information under udf/jars path. + * udf/jars path is persistent by itself but any child created will be transient. + * Servers as lock, not allowing to perform any action on the same time. + * There two types of actions: {@link Action#REGISTRATION} and {@link Action#UNREGISTRATION}. + * Before starting any action, users tries to create child path with jar name under this store + * and if such path already exists, receives action being performed on that very jar. + * When user finishes its action, he deletes child path with jar name. + * + * There are three udf areas: + * STAGING - area where user copies binary and source jars before starting registration process. + * REGISTRY - area where registered jars are stored. + * TMP - area where source and binary jars are backed up in unique folder during registration process. + */ +public class RemoteFunctionRegistry implements AutoCloseable { + + public static final String REGISTRY = "registry"; + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFunctionRegistry.class); + private static final ObjectMapper mapper = new ObjectMapper().enable(INDENT_OUTPUT); + + private final TransientStoreListener unregistrationListener; + private int retryAttempts; + private FileSystem fs; + private Path registryArea; + private Path stagingArea; + private Path tmpArea; + + private PersistentStore<Registry> registry; + private TransientStore<String> unregistration; + private TransientStore<String> jars; + + public RemoteFunctionRegistry(TransientStoreListener unregistrationListener) { + this.unregistrationListener = unregistrationListener; + } + + public void init(DrillConfig config, PersistentStoreProvider storeProvider, ClusterCoordinator coordinator) { + prepareStores(storeProvider, coordinator); + prepareAreas(config); + this.retryAttempts = config.getInt(ExecConstants.UDF_RETRY_ATTEMPTS); + } + + public Registry getRegistry() { + return registry.get(REGISTRY, null); + } + + public Registry getRegistry(DataChangeVersion version) { + return registry.get(REGISTRY, version); + } + + public void updateRegistry(Registry registryContent, DataChangeVersion version) throws VersionMismatchException { + registry.put(REGISTRY, registryContent, version); + } + + public void submitForUnregistration(String jar) { + unregistration.putIfAbsent(jar, jar); + } + + public void finishUnregistration(String jar) { + unregistration.remove(jar); + } + + public String addToJars(String jar, Action action) { + return jars.putIfAbsent(jar, action.toString()); + } + + public void removeFromJars(String jar) { + jars.remove(jar); + } + + public int getRetryAttempts() { + return retryAttempts; + } + + public FileSystem getFs() { + return fs; + } + + public Path getRegistryArea() { + return registryArea; + } + + public Path getStagingArea() { + return stagingArea; + } + + public Path getTmpArea() { + return tmpArea; + } + + /** + * Connects to three stores: REGISTRY, UNREGISTRATION, JARS. + * Puts in REGISTRY store with default instance of remote function registry if store is initiated for the first time. + * Registers unregistration listener in UNREGISTRATION store. + */ + private void prepareStores(PersistentStoreProvider storeProvider, ClusterCoordinator coordinator) { + try { + PersistentStoreConfig<Registry> registrationConfig = PersistentStoreConfig + .newProtoBuilder(SchemaUserBitShared.Registry.WRITE, SchemaUserBitShared.Registry.MERGE) + .name("udf") + .persist() + .build(); + registry = storeProvider.getOrCreateStore(registrationConfig); + registry.putIfAbsent(REGISTRY, Registry.getDefaultInstance()); + } catch (StoreException e) { + throw new DrillRuntimeException("Failure while loading remote registry.", e); + } + + TransientStoreConfig<String> unregistrationConfig = TransientStoreConfig. + newJacksonBuilder(mapper, String.class).name("udf/unregister").build(); + unregistration = coordinator.getOrCreateTransientStore(unregistrationConfig); + unregistration.addListener(unregistrationListener); + + TransientStoreConfig<String> jarsConfig = TransientStoreConfig. + newJacksonBuilder(mapper, String.class).name("udf/jars").build(); + jars = coordinator.getOrCreateTransientStore(jarsConfig); + } + + /** + * Creates if absent and validates three udf areas: STAGING, REGISTRY and TMP. + * Generated udf ares root from {@link ExecConstants#UDF_DIRECTORY_ROOT}, + * if not set, uses user home directory instead. + */ + private void prepareAreas(DrillConfig config) { + Configuration conf = new Configuration(); + if (config.hasPath(ExecConstants.UDF_DIRECTORY_FS)) { + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.getString(ExecConstants.UDF_DIRECTORY_FS)); + } + + try { + this.fs = FileSystem.get(conf); + } catch (IOException e) { + DrillRuntimeException.format(e, "Error during file system %s setup", conf.get(FileSystem.FS_DEFAULT_NAME_KEY)); + } + + String root = fs.getHomeDirectory().toUri().getPath(); + if (config.hasPath(ExecConstants.UDF_DIRECTORY_ROOT)) { + root = config.getString(ExecConstants.UDF_DIRECTORY_ROOT); + } + + this.registryArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_REGISTRY)); + this.stagingArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_STAGING)); + this.tmpArea = createArea(fs, root, config.getString(ExecConstants.UDF_DIRECTORY_TMP)); + } + + /** + * Concatenates udf are with root directory. + * Creates udf area, if area does not exist. + * Checks if area exists and is directory, if it is writable for current user, + * throws {@link DrillRuntimeException} otherwise. + * + * @param fs file system where area should be created or checked + * @param root root directory + * @param directory directory path + * @return path to area + */ + private Path createArea(FileSystem fs, String root, String directory) { + Path path = new Path(new File(root, directory).toURI().getPath()); + String fullPath = path.toUri().getPath(); + try { + fs.mkdirs(path); + Preconditions.checkState(fs.exists(path), "Area [%s] must exist", fullPath); + FileStatus fileStatus = fs.getFileStatus(path); + Preconditions.checkState(fileStatus.isDirectory(), "Area [%s] must be a directory", fullPath); + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + FsPermission permission = fileStatus.getPermission(); + // It is considered that current user has write rights on directory if: + // 1. current user is owner of the directory and has write rights + // 2. current user is in group that has write rights + // 3. any user has write rights + Preconditions.checkState( + (currentUser.getUserName().equals(fileStatus.getOwner()) + && permission.getUserAction().implies(FsAction.WRITE)) || + (Lists.newArrayList(currentUser.getGroupNames()).contains(fileStatus.getGroup()) + && permission.getGroupAction().implies(FsAction.WRITE)) || + permission.getOtherAction().implies(FsAction.WRITE), + "Area [%s] must be writable and executable for application user", fullPath); + } catch (Exception e) { + DrillRuntimeException.format(e, "Error during udf area creation [%s] on file system [%s]", fullPath, fs.getUri()); + } + return path; + } + + @Override + public void close() { + try { + AutoCloseables.close( + fs, + registry, + unregistration, + jars); + } catch (Exception e) { + logger.warn("Failure on close()", e); + } + } + + public enum Action { + REGISTRATION, + UNREGISTRATION + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 44e33cb..ceb1224 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -28,6 +28,7 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.LogicalPlanPersistence; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.sql.DrillOperatorTable; @@ -222,6 +223,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem return queryContextInfo; } + public RemoteFunctionRegistry getRemoteFunctionRegistry() { + return drillbitContext.getRemoteFunctionRegistry(); + } + @Override public ContextInformation getContextInformation() { return contextInformation; http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java index 5f489b4..6944a7f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java @@ -22,9 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.calcite.sql.SqlAggFunction; import org.apache.calcite.sql.SqlFunction; -import org.apache.calcite.sql.SqlPrefixOperator; import org.apache.drill.common.expression.FunctionCallFactory; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.fn.DrillFuncHolder; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.calcite.sql.SqlFunctionCategory; @@ -35,10 +33,10 @@ import org.apache.calcite.sql.SqlSyntax; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.server.options.SystemOptionManager; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; /** * Implementation of {@link SqlOperatorTable} that contains standard operators and functions provided through @@ -54,6 +52,9 @@ public class DrillOperatorTable extends SqlStdOperatorTable { private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithoutInferenceMap = ArrayListMultimap.create(); private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithInferenceMap = ArrayListMultimap.create(); + // indicates local function registry version based on which drill operator were loaded + // is used to define if we need to reload operator table in case when function signature was not found + private long functionRegistryVersion; private final OptionManager systemOptionManager; @@ -64,6 +65,23 @@ public class DrillOperatorTable extends SqlStdOperatorTable { this.systemOptionManager = systemOptionManager; } + /** Cleans up all operator holders and reloads operators */ + public void reloadOperators(FunctionImplementationRegistry registry) { + drillOperatorsWithoutInference.clear(); + drillOperatorsWithInference.clear(); + drillOperatorsWithoutInferenceMap.clear(); + drillOperatorsWithInferenceMap.clear(); + registry.register(this); + } + + public long setFunctionRegistryVersion(long version) { + return functionRegistryVersion = version; + } + + public long getFunctionRegistryVersion() { + return functionRegistryVersion; + } + /** * When the option planner.type_inference.enable is turned off, the operators which are added via this method * will be used. http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index dbe620d..19123d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -24,6 +24,9 @@ import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.FunctionNotFoundException; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.physical.PhysicalPlan; @@ -91,7 +94,7 @@ public class DrillSqlWorker { } try { - return handler.getPlan(sqlNode); + return getPhysicalPlan(handler, sqlNode, context); } catch(ValidationException e) { String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); throw UserException.validationError(e) @@ -108,5 +111,27 @@ public class DrillSqlWorker { } } + /** + * Returns query physical plan. + * In case of {@link FunctionNotFoundException} and dynamic udf support is enabled, attempts to load remote functions. + * If at least one function was loaded or local function function registry version has changed, + * makes one more attempt to get query physical plan. + */ + private static PhysicalPlan getPhysicalPlan(AbstractSqlHandler handler, SqlNode sqlNode, QueryContext context) + throws RelConversionException, IOException, ForemanSetupException, ValidationException { + try { + return handler.getPlan(sqlNode); + } catch (FunctionNotFoundException e) { + if (context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) { + DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable(); + FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry(); + if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) { + drillOperatorTable.reloadOperators(functionRegistry); + return handler.getPlan(sqlNode); + } + } + throw e; + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java index 3d0d538..0c3c6a0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java @@ -54,11 +54,15 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable; import org.apache.calcite.sql.validate.AggregatingSelectScope; import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; +import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.calcite.sql.validate.SqlValidatorImpl; import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.calcite.sql2rel.SqlToRelConverter; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.exception.FunctionNotFoundException; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.planner.cost.DrillCostBase; @@ -160,6 +164,11 @@ public class SqlConverter { SqlNode validatedNode = validator.validate(parsedNode); return validatedNode; } catch (RuntimeException e) { + final Throwable rootCause = ExceptionUtils.getRootCause(e); + if (rootCause instanceof SqlValidatorException + && StringUtils.contains(rootCause.getMessage(), "No match found for function signature")) { + throw new FunctionNotFoundException(rootCause.getMessage(), e); + } UserException.Builder builder = UserException .validationError(e) .addContext("SQL Query", sql); http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java new file mode 100644 index 0000000..8515c8a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.handlers; + +import com.google.common.collect.Lists; +import com.google.common.io.Files; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.commons.io.FileUtils; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.FunctionValidationException; +import org.apache.drill.exec.exception.JarValidationException; +import org.apache.drill.exec.exception.VersionMismatchException; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.sql.DirectPlan; +import org.apache.drill.exec.planner.sql.parser.SqlCreateFunction; +import org.apache.drill.exec.proto.UserBitShared.Jar; +import org.apache.drill.exec.proto.UserBitShared.Registry; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; +import org.apache.drill.exec.util.JarUtil; +import org.apache.drill.exec.work.foreman.ForemanSetupException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +public class CreateFunctionHandler extends DefaultSqlHandler { + + private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CreateFunctionHandler.class); + + public CreateFunctionHandler(SqlHandlerConfig config) { + super(config); + } + + /** + * Registers UDFs dynamically. Process consists of several steps: + * <ol> + * <li>Registering jar in jar registry to ensure that several jars with the same name is not registered.</li> + * <li>Binary and source jars validation and back up.</li> + * <li>Validation against local function registry.</li> + * <li>Validation against remote function registry.</li> + * <li>Remote function registry update.</li> + * <li>Copying of jars to registry area and clean up.</li> + * </ol> + * + * UDFs registration is allowed only if dynamic UDFs support is enabled. + * + * @return - Single row indicating list of registered UDFs, or error message otherwise. + */ + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException { + if (!context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) { + throw UserException.validationError() + .message("Dynamic UDFs support is disabled.") + .build(logger); + } + RemoteFunctionRegistry remoteRegistry = context.getRemoteFunctionRegistry(); + JarManager jarManager = new JarManager(sqlNode, remoteRegistry); + + boolean inProgress = false; + try { + final String action = remoteRegistry.addToJars(jarManager.getBinaryName(), RemoteFunctionRegistry.Action.REGISTRATION); + if (!(inProgress = action == null)) { + return DirectPlan.createDirectPlan(context, false, + String.format("Jar with %s name is used. Action: %s", jarManager.getBinaryName(), action)); + } + + jarManager.initRemoteBackup(); + List<String> functions = validateAgainstLocalRegistry(jarManager, context.getFunctionRegistry()); + initRemoteRegistration(functions, jarManager, remoteRegistry, remoteRegistry.getRetryAttempts()); + jarManager.deleteQuietlyFromStagingArea(); + + return DirectPlan.createDirectPlan(context, true, + String.format("The following UDFs in jar %s have been registered:\n%s", jarManager.getBinaryName(), functions)); + + } catch (Exception e) { + logger.error("Error during UDF registration", e); + return DirectPlan.createDirectPlan(context, false, e.getMessage()); + } finally { + if (inProgress) { + remoteRegistry.removeFromJars(jarManager.getBinaryName()); + } + jarManager.cleanUp(); + } + } + + + /** + * Instantiates coping of binary to local file system + * and validates functions from this jar against local function registry. + * + * @param jarManager helps coping binary to local file system + * @param localFunctionRegistry instance of local function registry to instantiate local validation + * @return list of validated function signatures + * @throws IOException in case of problems during copying binary to local file system + * @throws FunctionValidationException in case duplicated function was found + */ + private List<String> validateAgainstLocalRegistry(JarManager jarManager, + FunctionImplementationRegistry localFunctionRegistry) throws IOException { + Path localBinary = jarManager.copyBinaryToLocal(); + return localFunctionRegistry.validate(localBinary); + } + + /** + * Validates jar and its functions against remote jars. + * First checks if there is no duplicate by jar name and then looks for duplicates among functions. + * + * @param remoteJars list of remote jars to validate against + * @param jarName jar name to be validated + * @param functions list of functions present in jar to be validated + * @throws JarValidationException in case of jar with the same name was found + * @throws FunctionValidationException in case duplicated function was found + */ + private void validateAgainstRemoteRegistry(List<Jar> remoteJars, String jarName, List<String> functions) { + for (Jar remoteJar : remoteJars) { + if (remoteJar.getName().equals(jarName)) { + throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName)); + } + for (String remoteFunction : remoteJar.getFunctionSignatureList()) { + for (String func : functions) { + if (remoteFunction.equals(func)) { + throw new FunctionValidationException( + String.format("Found duplicated function in %s: %s", remoteJar.getName(), remoteFunction)); + } + } + } + } + } + + /** + * Instantiates remote registration. First gets remote function registry with version. + * Version is used to ensure that we update the same registry we validated against. + * Then validates against list of remote jars. + * If validation is successful, starts updating remote function registry. + * If during update {@link VersionMismatchException} was detected, + * calls itself recursively to instantiate new remote registration process. + * Since remote registry version has changed, we need to re-validate against remote function registry one more time. + * Each time recursive call occurs, decreases retry attempts counter by one. + * If retry attempts number hits 0, throws exception that failed to update remote function registry. + * + * @param functions list of functions present in jar + * @param jarManager helper class for copying jars to registry area + * @param remoteRegistry remote function registry + * @param retryAttempts number of retry attempts + * @throws IOException in case of problems with copying jars to registry area + */ + private void initRemoteRegistration(List<String> functions, + JarManager jarManager, + RemoteFunctionRegistry remoteRegistry, + int retryAttempts) throws IOException { + DataChangeVersion version = new DataChangeVersion(); + List<Jar> remoteJars = remoteRegistry.getRegistry(version).getJarList(); + validateAgainstRemoteRegistry(remoteJars, jarManager.getBinaryName(), functions); + jarManager.copyToRegistryArea(); + boolean cleanUp = true; + List<Jar> jars = Lists.newArrayList(remoteJars); + jars.add(Jar.newBuilder().setName(jarManager.getBinaryName()).addAllFunctionSignature(functions).build()); + Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build(); + try { + remoteRegistry.updateRegistry(updatedRegistry, version); + cleanUp = false; + } catch (VersionMismatchException ex) { + if (retryAttempts-- == 0) { + throw new DrillRuntimeException("Failed to update remote function registry. Exceeded retry attempts limit."); + } + initRemoteRegistration(functions, jarManager, remoteRegistry, retryAttempts); + } finally { + if (cleanUp) { + jarManager.deleteQuietlyFromRegistryArea(); + } + } + } + + /** + * Inner helper class that encapsulates logic for working with jars. + * During initialization it creates path to staging jar, local and remote temporary jars, registry jars. + * Is responsible for validation, copying and deletion actions. + */ + private class JarManager { + + private final String binaryName; + private final FileSystem fs; + + private final Path remoteTmpDir; + private final Path localTmpDir; + + private final Path stagingBinary; + private final Path stagingSource; + + private final Path tmpRemoteBinary; + private final Path tmpRemoteSource; + + private final Path registryBinary; + private final Path registrySource; + + JarManager(SqlNode sqlNode, RemoteFunctionRegistry remoteRegistry) throws ForemanSetupException { + SqlCreateFunction node = unwrap(sqlNode, SqlCreateFunction.class); + this.binaryName = ((SqlCharStringLiteral) node.getJar()).toValue(); + String sourceName = JarUtil.getSourceName(binaryName); + + this.stagingBinary = new Path(remoteRegistry.getStagingArea(), binaryName); + this.stagingSource = new Path(remoteRegistry.getStagingArea(), sourceName); + + this.remoteTmpDir = new Path(remoteRegistry.getTmpArea(), UUID.randomUUID().toString()); + this.tmpRemoteBinary = new Path(remoteTmpDir, binaryName); + this.tmpRemoteSource = new Path(remoteTmpDir, sourceName); + + this.registryBinary = new Path(remoteRegistry.getRegistryArea(), binaryName); + this.registrySource = new Path(remoteRegistry.getRegistryArea(), sourceName); + + this.localTmpDir = new Path(Files.createTempDir().toURI()); + this.fs = remoteRegistry.getFs(); + } + + /** + * @return binary jar name + */ + String getBinaryName() { + return binaryName; + } + + /** + * Validates that both binary and source jar are present in staging area, + * it is expected that binary and source have standard naming convention. + * Backs up both jars to unique folder in remote temporary area. + * + * @throws IOException in case of binary or source absence or problems during copying jars + */ + void initRemoteBackup() throws IOException { + fs.getFileStatus(stagingBinary); + fs.getFileStatus(stagingSource); + fs.mkdirs(remoteTmpDir); + FileUtil.copy(fs, stagingBinary, fs, tmpRemoteBinary, false, true, fs.getConf()); + FileUtil.copy(fs, stagingSource, fs, tmpRemoteSource, false, true, fs.getConf()); + } + + /** + * Copies binary jar to unique folder on local file system. + * Source jar is not needed for local validation. + * + * @return path to local binary jar + * @throws IOException in case of problems during copying binary jar + */ + Path copyBinaryToLocal() throws IOException { + Path localBinary = new Path(localTmpDir, binaryName); + fs.copyToLocalFile(tmpRemoteBinary, localBinary); + return localBinary; + } + + /** + * Copies binary and source jars to registry area, + * in case of {@link IOException} removes copied jar(-s) from registry area + * + * @throws IOException is re-thrown in case of problems during copying process + */ + void copyToRegistryArea() throws IOException { + FileUtil.copy(fs, tmpRemoteBinary, fs, registryBinary, false, true, fs.getConf()); + try { + FileUtil.copy(fs, tmpRemoteSource, fs, registrySource, false, true, fs.getConf()); + } catch (IOException e) { + deleteQuietly(registryBinary, false); + throw new IOException(e); + } + } + + /** + * Deletes binary and sources jars from staging area, in case of problems, logs warning and proceeds. + */ + void deleteQuietlyFromStagingArea() { + deleteQuietly(stagingBinary, false); + deleteQuietly(stagingSource, false); + } + + /** + * Deletes binary and sources jars from registry area, in case of problems, logs warning and proceeds. + */ + void deleteQuietlyFromRegistryArea() { + deleteQuietly(registryBinary, false); + deleteQuietly(registrySource, false); + } + + /** + * Removes quietly remote and local unique folders in temporary directories. + */ + void cleanUp() { + FileUtils.deleteQuietly(new File(localTmpDir.toUri())); + deleteQuietly(remoteTmpDir, true); + } + + /** + * Deletes quietly file or directory, in case of errors, logs warning and proceeds. + * + * @param path path to file or directory + * @param isDirectory set to true if we need to delete a directory + */ + private void deleteQuietly(Path path, boolean isDirectory) { + try { + fs.delete(path, isDirectory); + } catch (IOException e) { + logger.warn(String.format("Error during deletion [%s]", path.toUri().getPath()), e); + } + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java new file mode 100644 index 0000000..5269a4b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.handlers; + +import com.google.common.collect.Lists; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.VersionMismatchException; +import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.sql.DirectPlan; +import org.apache.drill.exec.planner.sql.parser.SqlDropFunction; +import org.apache.drill.exec.proto.UserBitShared.Jar; +import org.apache.drill.exec.proto.UserBitShared.Registry; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; +import org.apache.drill.exec.util.JarUtil; +import org.apache.drill.exec.work.foreman.ForemanSetupException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.List; + +public class DropFunctionHandler extends DefaultSqlHandler { + + private static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DropFunctionHandler.class); + + public DropFunctionHandler(SqlHandlerConfig config) { + super(config); + } + + /** + * Unregisters UDFs dynamically. Process consists of several steps: + * <ol> + * <li>Registering jar in jar registry to ensure that several jars with the same name is not being unregistered.</li> + * <li>Starts remote unregistration process, gets list of all jars and excludes jar to be deleted.</li> + * <li>Signals drill bits to start local unregistration process.</li> + * <li>Removes source and binary jars from registry area.</li> + * </ol> + * + * UDFs unregistration is allowed only if dynamic UDFs support is enabled. + * Only jars registered dynamically can be unregistered, + * built-in functions loaded at start up are not allowed to be unregistered. + * + * Limitation: before jar unregistration make sure no one is using functions from this jar. + * There is no guarantee that running queries will finish successfully or give correct result. + * + * @return - Single row indicating list of unregistered UDFs, raise exception otherwise + */ + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException { + if (!context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) { + throw UserException.validationError() + .message("Dynamic UDFs support is disabled.") + .build(logger); + } + + SqlDropFunction node = unwrap(sqlNode, SqlDropFunction.class); + String jarName = ((SqlCharStringLiteral) node.getJar()).toValue(); + RemoteFunctionRegistry remoteFunctionRegistry = context.getRemoteFunctionRegistry(); + + boolean inProgress = false; + try { + final String action = remoteFunctionRegistry.addToJars(jarName, RemoteFunctionRegistry.Action.UNREGISTRATION); + if (!(inProgress = action == null)) { + return DirectPlan.createDirectPlan(context, false, String.format("Jar with %s name is used. Action: %s", jarName, action)); + } + + Jar deletedJar = unregister(jarName, remoteFunctionRegistry, remoteFunctionRegistry.getRetryAttempts()); + if (deletedJar == null) { + return DirectPlan.createDirectPlan(context, false, String.format("Jar %s is not registered in remote registry", jarName)); + } + remoteFunctionRegistry.submitForUnregistration(jarName); + + removeJarFromArea(jarName, remoteFunctionRegistry.getFs(), remoteFunctionRegistry.getRegistryArea()); + removeJarFromArea(JarUtil.getSourceName(jarName), remoteFunctionRegistry.getFs(), remoteFunctionRegistry.getRegistryArea()); + + return DirectPlan.createDirectPlan(context, true, + String.format("The following UDFs in jar %s have been unregistered:\n%s", jarName, deletedJar.getFunctionSignatureList())); + + } catch (Exception e) { + logger.error("Error during UDF unregistration", e); + return DirectPlan.createDirectPlan(context, false, e.getMessage()); + } finally { + if (inProgress) { + remoteFunctionRegistry.finishUnregistration(jarName); + remoteFunctionRegistry.removeFromJars(jarName); + } + } + } + + /** + * First gets remote function registry with version. + * Version is used to ensure that we update the same registry we removed jars from. + * Looks for a jar to be deleted, if founds one, + * attempts to update remote registry with updated list of jars, that excludes jar to be deleted. + * If during update {@link VersionMismatchException} was detected, + * calls itself recursively to instantiate new remote unregistration process. + * Since remote registry version has changed we need to look for jar to be deleted one more time. + * Each time recursive call occurs, decreases retry attempts counter by one. + * If retry attempts number hits 0, throws exception that failed to update remote function registry. + * + * @param jarName jar name + * @param remoteFunctionRegistry remote function registry + * @param retryAttempts number of retry attempts + * @return jar that was unregistered, null otherwise + */ + private Jar unregister(String jarName, RemoteFunctionRegistry remoteFunctionRegistry, int retryAttempts) { + DataChangeVersion version = new DataChangeVersion(); + Registry registry = remoteFunctionRegistry.getRegistry(version); + Jar jarToBeDeleted = null; + List<Jar> jars = Lists.newArrayList(); + for (Jar j : registry.getJarList()) { + if (j.getName().equals(jarName)) { + jarToBeDeleted = j; + } else { + jars.add(j); + } + } + if (jarToBeDeleted != null) { + Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build(); + try { + remoteFunctionRegistry.updateRegistry(updatedRegistry, version); + } catch (VersionMismatchException ex) { + if (retryAttempts-- == 0) { + throw new DrillRuntimeException("Failed to update remote function registry. Exceeded retry attempts limit."); + } + unregister(jarName, remoteFunctionRegistry, retryAttempts); + } + } + return jarToBeDeleted; + } + + /** + * Removes jar from indicated area, in case of error log it and proceeds. + * + * @param jarName jar name + * @param fs file system + * @param area path to area + */ + private void removeJarFromArea(String jarName, FileSystem fs, Path area) { + try { + fs.delete(new Path(area, jarName), false); + } catch (IOException e) { + logger.error("Error removing jar {} from area {}", jarName, area.toUri().getPath()); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java index fa0d319..53e3cd5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java @@ -174,6 +174,8 @@ public class CompoundIdentifierConverter extends SqlShuttle { rules.put(SqlRefreshMetadata.class, R(D)); rules.put(SqlSetOption.class, R(D, D, D)); rules.put(SqlDescribeSchema.class, R(D)); + rules.put(SqlCreateFunction.class, R(D)); + rules.put(SqlDropFunction.class, R(D)); REWRITE_RULES = ImmutableMap.copyOf(rules); } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java new file mode 100644 index 0000000..c14f468 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateFunction.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.parser; + +import com.google.common.collect.Lists; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.CreateFunctionHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; + +import java.util.List; + +public class SqlCreateFunction extends DrillSqlCall { + + private final SqlNode jar; + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_FUNCTION", SqlKind.OTHER) { + @Override + public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { + return new SqlCreateFunction(pos, operands[0]); + } + }; + + public SqlCreateFunction(SqlParserPos pos, SqlNode jar) { + super(pos); + this.jar = jar; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + List<SqlNode> opList = Lists.newArrayList(); + opList.add(jar); + return opList; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE"); + writer.keyword("FUNCTION"); + writer.keyword("USING"); + writer.keyword("JAR"); + jar.unparse(writer, leftPrec, rightPrec); + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new CreateFunctionHandler(config); + } + + public SqlNode getJar() { return jar; } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java new file mode 100644 index 0000000..77d2b76 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropFunction.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.parser; + +import com.google.common.collect.Lists; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.DropFunctionHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; + +import java.util.List; + +public class SqlDropFunction extends DrillSqlCall { + + private final SqlNode jar; + + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP_FUNCTION", SqlKind.OTHER) { + @Override + public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { + return new SqlDropFunction(pos, operands[0]); + } + }; + + public SqlDropFunction(SqlParserPos pos, SqlNode jar) { + super(pos); + this.jar = jar; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + List<SqlNode> opList = Lists.newArrayList(); + opList.add(jar); + return opList; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("DROP"); + writer.keyword("FUNCTION"); + writer.keyword("USING"); + writer.keyword("JAR"); + jar.unparse(writer, leftPrec, rightPrec); + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new DropFunctionHandler(config); + } + + public SqlNode getJar() { return jar; } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java index bd7c779..3eed022 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/InboundImpersonationManager.java @@ -29,7 +29,7 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.proto.UserBitShared.UserCredentials; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.server.options.TypeValidators; +import org.apache.drill.exec.server.options.TypeValidators.StringValidator; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -84,10 +84,10 @@ public class InboundImpersonationManager { /** * Validator for impersonation policies. */ - public static class InboundImpersonationPolicyValidator extends TypeValidators.AdminOptionValidator { + public static class InboundImpersonationPolicyValidator extends StringValidator { public InboundImpersonationPolicyValidator(String name, String def) { - super(name, def); + super(name, def, true); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 55a2b05..3f74268 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -123,6 +123,7 @@ public class Drillbit implements AutoCloseable { storageRegistry.init(); drillbitContext.getOptionManager().init(); javaPropertiesToSystemOptions(); + manager.getContext().getRemoteFunctionRegistry().init(context.getConfig(), storeProvider, coord); registrationHandle = coord.register(md); webServer.start(); http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index 1af6d11..3eb87ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -29,6 +29,7 @@ import org.apache.drill.common.scanner.persistence.ScanResult; import org.apache.drill.exec.compile.CodeCompiler; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; import org.apache.drill.exec.planner.PhysicalPlanReader; @@ -180,8 +181,12 @@ public class DrillbitContext implements AutoCloseable { return classpathScan; } + public RemoteFunctionRegistry getRemoteFunctionRegistry() { return functionRegistry.getRemoteFunctionRegistry(); } + @Override public void close() throws Exception { getOptionManager().close(); + getFunctionImplementationRegistry().close(); + getRemoteFunctionRegistry().close(); } } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java index db42603..82f4ab9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java @@ -26,9 +26,16 @@ public abstract class OptionValidator { // Stored here as well as in the option static class to allow insertion of option optionName into // the error messages produced by the validator private final String optionName; + private final boolean isAdminOption; + /** By default, if admin option value is not specified, it would be set to false.*/ public OptionValidator(String optionName) { + this(optionName, false); + } + + public OptionValidator(String optionName, boolean isAdminOption) { this.optionName = optionName; + this.isAdminOption = isAdminOption; } /** @@ -69,6 +76,13 @@ public abstract class OptionValidator { } /** + * @return true is option is system-level property that can be only specified by admin (not user). + */ + public boolean isAdminOption() { + return isAdminOption; + } + + /** * Gets the default option value for this validator. * * @return default option value http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 115ea47..ee94493 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -147,7 +147,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea ExecConstants.IMPLICIT_FQN_COLUMN_LABEL_VALIDATOR, ExecConstants.IMPLICIT_FILEPATH_COLUMN_LABEL_VALIDATOR, ExecConstants.CODE_GEN_EXP_IN_METHOD_SIZE_VALIDATOR, - ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR + ExecConstants.CREATE_PREPARE_STATEMENT_TIMEOUT_MILLIS_VALIDATOR, + ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED_VALIDATOR }; final Map<String, OptionValidator> tmp = new HashMap<>(); for (final OptionValidator validator : validators) { http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java index d015040..b4074ab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/TypeValidators.java @@ -141,25 +141,41 @@ public class TypeValidators { public static class BooleanValidator extends TypeValidator { public BooleanValidator(String name, boolean def) { - super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def)); + this(name, def, false); + } + + public BooleanValidator(String name, boolean def, boolean isAdminOption) { + super(name, Kind.BOOLEAN, OptionValue.createBoolean(OptionType.SYSTEM, name, def), isAdminOption); } } public static class StringValidator extends TypeValidator { public StringValidator(String name, String def) { - super(name, Kind.STRING, OptionValue.createString(OptionType.SYSTEM, name, def)); + this(name, def, false); + } + + public StringValidator(String name, String def, boolean isAdminOption) { + super(name, Kind.STRING, OptionValue.createString(OptionType.SYSTEM, name, def), isAdminOption); } } public static class LongValidator extends TypeValidator { public LongValidator(String name, long def) { - super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def)); + this(name, def, false); + } + + public LongValidator(String name, long def, boolean isAdminOption) { + super(name, Kind.LONG, OptionValue.createLong(OptionType.SYSTEM, name, def), isAdminOption); } } public static class DoubleValidator extends TypeValidator { public DoubleValidator(String name, double def) { - super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def)); + this(name, def, false); + } + + public DoubleValidator(String name, double def, boolean isAdminOption) { + super(name, Kind.DOUBLE, OptionValue.createDouble(OptionType.SYSTEM, name, def), isAdminOption); } } @@ -184,22 +200,6 @@ public class TypeValidators { } } - public static class AdminOptionValidator extends StringValidator { - public AdminOptionValidator(String name, String def) { - super(name, def); - } - - @Override - public void validate(final OptionValue v, final OptionManager manager) { - if (v.type != OptionType.SYSTEM) { - throw UserException.validationError() - .message("Admin related settings can only be set at SYSTEM level scope. Given scope '%s'.", v.type) - .build(logger); - } - super.validate(v, manager); - } - } - /** * Validator that checks if the given value is included in a list of acceptable values. Case insensitive. */ @@ -229,7 +229,11 @@ public class TypeValidators { private final OptionValue defaultValue; public TypeValidator(final String name, final Kind kind, final OptionValue defValue) { - super(name); + this(name, kind, defValue, false); + } + + public TypeValidator(final String name, final Kind kind, final OptionValue defValue, final boolean isAdminOption) { + super(name, isAdminOption); checkArgument(defValue.type == OptionType.SYSTEM, "Default value must be SYSTEM type."); this.kind = kind; this.defaultValue = defValue; @@ -248,6 +252,11 @@ public class TypeValidators { kind.name(), v.kind.name())) .build(logger); } + if (isAdminOption() && v.type != OptionType.SYSTEM) { + throw UserException.validationError() + .message("Admin related settings can only be set at SYSTEM level scope. Given scope '%s'.", v.type) + .build(logger); + } } } } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java index 248c3cb..ea38278 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java @@ -17,11 +17,11 @@ */ package org.apache.drill.exec.store.sys; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; + import java.util.Iterator; import java.util.Map; -import org.apache.drill.common.collections.ImmutableEntry; - public abstract class BasePersistentStore<V> implements PersistentStore<V> { @Override @@ -29,4 +29,18 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> { return getRange(0, Integer.MAX_VALUE); } + /** By default get with version will behave the same way as without version. + * Override this method to add version support. */ + @Override + public V get(String key, DataChangeVersion version) { + return get(key); + } + + /** By default put with version will behave the same way as without version. + * Override this method to add version support. */ + @Override + public void put(String key, V value, DataChangeVersion version) { + put(key, value); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java index 767b1d5..bb23752 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.store.sys; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; + import java.util.Iterator; import java.util.Map; @@ -38,6 +40,14 @@ public interface PersistentStore<V> extends AutoCloseable { V get(String key); /** + * Returns the value for the given key if exists, null otherwise. + * Sets data change version number. + * @param key lookup key + * @param version version holder + */ + V get(String key, DataChangeVersion version); + + /** * Stores the (key, value) tuple in the store. Lifetime of the tuple depends upon store {@link #getMode mode}. * * @param key lookup key @@ -45,6 +55,17 @@ public interface PersistentStore<V> extends AutoCloseable { */ void put(String key, V value); + /** + * Stores the (key, value) tuple in the store. + * If tuple already exits, stores it only if versions match, + * otherwise throws {@link org.apache.drill.exec.exception.VersionMismatchException} + * Lifetime of the tuple depends upon store {@link #getMode mode}. + * + * @param key lookup key + * @param value value to store + * @param version version holder + */ + void put(String key, V value, DataChangeVersion version); /** * Removes the value corresponding to the given key if exists, nothing happens otherwise. http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java new file mode 100644 index 0000000..10c1b8f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store; + +public class DataChangeVersion { + + private int version; + + public void setVersion(int version) { + this.version = version; + } + + public int getVersion() { + return version; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/89f2633f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java index 3dde4b8..55f72c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java @@ -63,7 +63,17 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> { @Override public V get(final String key) { - final byte[] bytes = client.get(key); + return get(key, false, null); + } + + @Override + public V get(final String key, DataChangeVersion version) { + return get(key, true, version); + } + + public V get(final String key, boolean consistencyFlag, DataChangeVersion version) { + byte[] bytes = client.get(key, consistencyFlag, version); + if (bytes == null) { return null; } @@ -76,28 +86,30 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> { @Override public void put(final String key, final V value) { + put(key, value, null); + } + + @Override + public void put(final String key, final V value, DataChangeVersion version) { final InstanceSerializer<V> serializer = config.getSerializer(); try { final byte[] bytes = serializer.serialize(value); - client.put(key, bytes); + client.put(key, bytes, version); } catch (final IOException e) { throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e); } } + @Override public boolean putIfAbsent(final String key, final V value) { - final V old = get(key); - if (old == null) { - try { - final byte[] bytes = config.getSerializer().serialize(value); - client.put(key, bytes); - return true; - } catch (final IOException e) { - throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e); - } + try { + final byte[] bytes = config.getSerializer().serialize(value); + final byte[] data = client.putIfAbsent(key, bytes); + return data == null; + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e); } - return false; } @Override