This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch newFunctionService in repository https://gitbox.apache.org/repos/asf/geode.git
commit b31f42123b0371f45bf44081b44ecc130ea45a0e Author: Udo <ukohlme...@pivotal.io> AuthorDate: Thu Feb 22 14:46:11 2018 -0800 Splitting of FunctionService into Client and server completed. Removing of original FunctionService, before removing the import statement to final real failures. --- geode-core/build.gradle | 15 + .../cache/client/function/FunctionService.java | 82 ------ .../geode/cache/client/function/FunctionService.kt | 82 ++++++ .../internal/function/FunctionServiceImpl.java | 136 --------- .../internal/function/FunctionServiceImpl.kt | 124 +++++++++ .../geode/cache/execute/FunctionService.java | 272 ------------------ .../execute/internal/FunctionServiceManager.java | 306 +-------------------- .../geode/cache/function/FunctionService.java | 77 ------ .../apache/geode/cache/function/FunctionService.kt | 76 +++++ .../cache/function/internal/FunctionServiceImpl.kt | 91 ++++++ .../cache/execute/ServerFunctionExecutor.java | 10 +- gradle/wrapper/gradle-wrapper.properties | 4 +- 12 files changed, 398 insertions(+), 877 deletions(-) diff --git a/geode-core/build.gradle b/geode-core/build.gradle index 426840f..fff4401 100755 --- a/geode-core/build.gradle +++ b/geode-core/build.gradle @@ -17,6 +17,7 @@ apply plugin: 'antlr' +apply plugin: 'kotlin' apply plugin: 'me.champeau.gradle.jmh' sourceSets { @@ -154,6 +155,7 @@ dependencies { testRuntime 'xerces:xercesImpl:' + project.'xercesImpl.version' testCompile project(':geode-concurrency-test') + compile "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version" } def generatedResources = "$buildDir/generated-resources/main" @@ -240,4 +242,17 @@ dependencies { } tasks.eclipse.dependsOn(generateGrammarSource) +buildscript { + ext.kotlin_version = '1.2.21' + repositories { + mavenCentral() + } + dependencies { + classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version" + classpath "org.jetbrains.kotlin:kotlin-stdlib-jdk8:$kotlin_version" + } +} +repositories { + mavenCentral() +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/function/FunctionService.java b/geode-core/src/main/java/org/apache/geode/cache/client/function/FunctionService.java deleted file mode 100644 index 0226353..0000000 --- a/geode-core/src/main/java/org/apache/geode/cache/client/function/FunctionService.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.geode.cache.client.function; - -import org.apache.geode.cache.RegionService; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; -import org.apache.geode.cache.client.Pool; -import org.apache.geode.cache.common.function.FunctionServiceBase; -import org.apache.geode.cache.execute.Execution; -import org.apache.geode.cache.execute.FunctionException; - -import java.util.Properties; - -public interface FunctionService extends FunctionServiceBase { - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a server in the provided {@link Pool}. - * <p> - * If the server goes down while dispatching or executing the function, an Exception will be - * thrown. - * @param pool from which to chose a server for execution - * @return Execution - * @throws FunctionException if Pool instance passed in is null - * @since GemFire 6.0 - */ - Execution onServer(Pool pool); - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all the servers in the provided {@link Pool}. If one of the servers goes down while dispatching - * or executing the function on the server, an Exception will be thrown. - * @param pool the set of servers to execute the function - * @return Execution - * @throws FunctionException if Pool instance passed in is null - * @since GemFire 6.0 - */ - Execution onServers(Pool pool); - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a server that the given cache is connected to. - * <p> - * If the server goes down while dispatching or executing the function, an Exception will be - * thrown. - * @param regionService obtained from {@link ClientCacheFactory#create} or - * {@link ClientCache#createAuthenticatedView(Properties)}. - * @return Execution - * @throws FunctionException if cache is null, is not on a client, or it does not have a default - * pool - * @since GemFire 6.5 - */ - Execution onServer(RegionService regionService); - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all the servers that the given cache is connected to. If one of the servers goes down while - * dispatching or executing the function on the server, an Exception will be thrown. - * @param regionService obtained from {@link ClientCacheFactory#create} or - * {@link ClientCache#createAuthenticatedView(Properties)}. - * @return Execution - * @throws FunctionException if cache is null, is not on a client, or it does not have a default - * pool - * @since GemFire 6.5 - */ - Execution onServers(RegionService regionService); -} diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/function/FunctionService.kt b/geode-core/src/main/java/org/apache/geode/cache/client/function/FunctionService.kt new file mode 100644 index 0000000..a06619e --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/client/function/FunctionService.kt @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.cache.client.function + +import org.apache.geode.cache.RegionService +import org.apache.geode.cache.client.ClientCache +import org.apache.geode.cache.client.ClientCacheFactory +import org.apache.geode.cache.client.Pool +import org.apache.geode.cache.common.function.FunctionServiceBase +import org.apache.geode.cache.execute.Execution +import org.apache.geode.cache.execute.FunctionException + +interface FunctionService : FunctionServiceBase { + + /** + * Returns an [Execution] object that can be used to execute a data independent function on + * a server in the provided [Pool]. + * + * + * If the server goes down while dispatching or executing the function, an Exception will be + * thrown. + * @param pool from which to chose a server for execution + * @return Execution + * @throws FunctionException if Pool instance passed in is null + * @since GemFire 6.0 + */ + fun onServer(pool: Pool?): Execution<*, *, *> + + /** + * Returns an [Execution] object that can be used to execute a data independent function on + * all the servers in the provided [Pool]. If one of the servers goes down while dispatching + * or executing the function on the server, an Exception will be thrown. + * @param pool the set of servers to execute the function + * @return Execution + * @throws FunctionException if Pool instance passed in is null + * @since GemFire 6.0 + */ + fun onServers(pool: Pool?): Execution<*, *, *> + + /** + * Returns an [Execution] object that can be used to execute a data independent function on + * a server that the given cache is connected to. + * + * + * If the server goes down while dispatching or executing the function, an Exception will be + * thrown. + * @param regionService obtained from [ClientCacheFactory.create] or + * [ClientCache.createAuthenticatedView]. + * @return Execution + * @throws FunctionException if cache is null, is not on a client, or it does not have a default + * pool + * @since GemFire 6.5 + */ + fun onServer(regionService: RegionService?): Execution<*, *, *> + + /** + * Returns an [Execution] object that can be used to execute a data independent function on + * all the servers that the given cache is connected to. If one of the servers goes down while + * dispatching or executing the function on the server, an Exception will be thrown. + * @param regionService obtained from [ClientCacheFactory.create] or + * [ClientCache.createAuthenticatedView]. + * @return Execution + * @throws FunctionException if cache is null, is not on a client, or it does not have a default + * pool + * @since GemFire 6.5 + */ + fun onServers(regionService: RegionService?): Execution<*, *, *> +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/function/FunctionServiceImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/function/FunctionServiceImpl.java deleted file mode 100644 index 35dce03..0000000 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/function/FunctionServiceImpl.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.geode.cache.client.internal.function; - -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionService; -import org.apache.geode.cache.client.Pool; -import org.apache.geode.cache.client.PoolManager; -import org.apache.geode.cache.client.function.FunctionService; -import org.apache.geode.cache.client.internal.ProxyCache; -import org.apache.geode.cache.client.internal.ProxyRegion; -import org.apache.geode.cache.execute.Execution; -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionException; -import org.apache.geode.internal.cache.InternalRegion; -import org.apache.geode.internal.cache.execute.ServerFunctionExecutor; -import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor; -import org.apache.geode.internal.i18n.LocalizedStrings; - -import java.util.Map; - -public class FunctionServiceImpl implements FunctionService { - - @Override - public Execution onRegion(Region region) { - if (region == null) { - throw new FunctionException( - LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Region instance ")); - } - ProxyCache proxyCache = null; - String poolName = region.getAttributes().getPoolName(); - if (poolName != null) { - Pool pool = PoolManager.find(poolName); - if (pool.getMultiuserAuthentication()) { - if (region instanceof ProxyRegion) { - ProxyRegion proxyRegion = (ProxyRegion) region; - region = proxyRegion.getRealRegion(); - proxyCache = proxyRegion.getAuthenticatedCache(); - } else { - throw new UnsupportedOperationException(); - } - } - } - - if (isClientRegion(region)) { - return new ServerRegionFunctionExecutor(region, proxyCache); - } - throw new FunctionException("Cannot find suitable Function Executor"); - } - - @Override - public Execution onServer(Pool pool) { - if (pool == null) { - throw new FunctionException( - LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Pool instance ")); - } - - if (pool.getMultiuserAuthentication()) { - throw new UnsupportedOperationException(); - } - - return new ServerFunctionExecutor(pool, false, null); - } - - @Override - public Execution onServers(Pool pool) { - if (pool == null) { - throw new FunctionException( - LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Pool instance ")); - } - - if (pool.getMultiuserAuthentication()) { - throw new UnsupportedOperationException(); - } - - return new ServerFunctionExecutor(pool, true, null); - } - - @Override - public Execution onServer(RegionService regionService) { - return null; - } - - @Override - public Execution onServers(RegionService regionService) { - return null; - } - - @Override - public Function getFunction(String functionId) { - return null; - } - - @Override - public void registerFunction(Function function) { - - } - - @Override - public void unregisterFunction(String functionId) { - - } - - @Override - public boolean isRegistered(String functionId) { - return false; - } - - @Override - public Map<String, Function> getRegisteredFunctions() { - return null; - } - - /** - * @return true if the method is called on a region has a {@link Pool}. - * @since GemFire 6.0 - */ - private boolean isClientRegion(Region region) { - return ((InternalRegion) region).hasServerProxy(); - - } -} diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/function/FunctionServiceImpl.kt b/geode-core/src/main/java/org/apache/geode/cache/client/internal/function/FunctionServiceImpl.kt new file mode 100644 index 0000000..f5a71fd --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/function/FunctionServiceImpl.kt @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.cache.client.internal.function + +import org.apache.geode.cache.Region +import org.apache.geode.cache.RegionService +import org.apache.geode.cache.client.Pool +import org.apache.geode.cache.client.PoolManager +import org.apache.geode.cache.client.function.FunctionService +import org.apache.geode.cache.client.internal.InternalClientCache +import org.apache.geode.cache.client.internal.ProxyCache +import org.apache.geode.cache.client.internal.ProxyRegion +import org.apache.geode.cache.execute.Execution +import org.apache.geode.cache.execute.Function +import org.apache.geode.cache.execute.FunctionException +import org.apache.geode.cache.execute.internal.FunctionServiceManager +import org.apache.geode.internal.cache.InternalRegion +import org.apache.geode.internal.cache.execute.ServerFunctionExecutor +import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor +import org.apache.geode.internal.i18n.LocalizedStrings + +class FunctionServiceImpl : FunctionService { + private val functionServiceManager = FunctionServiceManager() + + override fun onRegion(region: Region<*, *>?): Execution<*, *, *> { + region?.let { + var proxyCache: ProxyCache? = null + var tmpRegion: Region<*, *> = it + + val poolName = it.attributes.poolName + if (poolName != null) { + val pool = PoolManager.find(poolName) + if (pool.multiuserAuthentication) { + if (it is ProxyRegion) { + tmpRegion = it.realRegion + proxyCache = it.authenticatedCache + } else { + throw UnsupportedOperationException() + } + } + } + if (tmpRegion is InternalRegion && isClientRegion(tmpRegion)) { + return ServerRegionFunctionExecutor(region, proxyCache) + } + throw FunctionException("Cannot find suitable Function Executor") + } + throw FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Region instance ")) + } + + override fun onServer(pool: Pool?): Execution<*, *, *> = + createServerFunctionExecutorForPool(pool, false) + + override fun onServers(pool: Pool?): Execution<*, *, *> = + createServerFunctionExecutorForPool(pool, true) + + override fun onServer(regionService: RegionService?): Execution<*, *, *> = + createServerFunctionExecutorForRegionService(regionService, false) + + override fun onServers(regionService: RegionService?): Execution<*, *, *> = + createServerFunctionExecutorForRegionService(regionService, true) + + private fun createServerFunctionExecutorForPool(pool: Pool?, runOnAllServers: Boolean, proxyCache: ProxyCache? = null): Execution<*, *, *> { + pool?.let { + if (it.multiuserAuthentication) { + throw UnsupportedOperationException() + } + + return ServerFunctionExecutor(pool, runOnAllServers, proxyCache, *returnServerGroupAsTypeArray(it.serverGroup)) + } + throw FunctionException( + LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Pool instance ")) + } + + private fun createServerFunctionExecutorForRegionService(regionService: RegionService?, runOnAllServers: Boolean) + : Execution<*, *, *> { + regionService?.let { + if (it is InternalClientCache) { + if (!it.isClient) { + throw FunctionException("The cache was not a client cache") + } else { + it.defaultPool?.let { + return createServerFunctionExecutorForPool(it, runOnAllServers) + } + throw FunctionException("The client cache does not have a default pool") + } + } else { + val proxyCache = it as ProxyCache + val pool = proxyCache.userAttributes.pool + return createServerFunctionExecutorForPool(pool, runOnAllServers, proxyCache) + } + } + throw FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL + .toLocalizedString("RegionService instance ")) + } + + private fun returnServerGroupAsTypeArray(serverGroup: String): Array<String> = serverGroup.split(",").toTypedArray() + + + override fun getFunction(functionId: String?): Function<*> = functionServiceManager.getFunction(functionId) + override fun registerFunction(function: Function<*>?) = functionServiceManager.registerFunction(function) + override fun unregisterFunction(functionId: String?) = functionServiceManager.unregisterFunction(functionId) + override fun isRegistered(functionId: String?): Boolean = functionServiceManager.isRegistered(functionId) + override fun getRegisteredFunctions(): MutableMap<String, Function<Any>> = functionServiceManager.registeredFunctions + + /** + * @return true if the method is called on a region has a [Pool]. + * @since GemFire 6.0 + */ + private fun isClientRegion(region: InternalRegion): Boolean = region.hasServerProxy() +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java b/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java deleted file mode 100755 index 40ed42e..0000000 --- a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.cache.execute; - -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionService; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; -import org.apache.geode.cache.client.Pool; -import org.apache.geode.cache.execute.internal.FunctionServiceManager; -import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.distributed.DistributedSystemDisconnectedException; -import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.internal.i18n.LocalizedStrings; - -/** - * Provides the entry point into execution of user defined {@linkplain Function}s. - * <p> - * Function execution provides a means to route application behaviour to {@linkplain Region data} or - * more generically to peers in a {@link DistributedSystem} or servers in a {@link Pool}. - * </p> - * - * @since GemFire 6.0 - */ -public class FunctionService { - private static final FunctionServiceManager functionSvcMgr = new FunctionServiceManager(); - - FunctionService() {} - - /** - * Returns an {@link Execution} object that can be used to execute a data dependent function on - * the specified Region.<br> - * When invoked from a GemFire client, the method returns an Execution instance that sends a - * message to one of the connected servers as specified by the {@link Pool} for the region. <br> - * Depending on the filters setup on the {@link Execution}, the function is executed on all - * GemFire members that define the data region, or a subset of members. - * {@link Execution#withFilter(Set)}). - * - * For DistributedRegions with DataPolicy.NORMAL, it throws UnsupportedOperationException. For - * DistributedRegions with DataPolicy.EMPTY, execute the function on any random member which has - * DataPolicy.REPLICATE <br> - * . For DistributedRegions with DataPolicy.REPLICATE, execute the function locally. For Regions - * with DataPolicy.PARTITION, it executes on members where the data resides as specified by the - * filter. - * - * @param region - * @return Execution - * @throws FunctionException if the region passed in is null - * @since GemFire 6.0 - */ - public static Execution onRegion(Region region) { - return functionSvcMgr.onRegion(region); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a server in the provided {@link Pool}. - * <p> - * If the server goes down while dispatching or executing the function, an Exception will be - * thrown. - * - * @param pool from which to chose a server for execution - * @return Execution - * @throws FunctionException if Pool instance passed in is null - * @since GemFire 6.0 - */ - public static Execution onServer(Pool pool) { - return functionSvcMgr.onServer(pool); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all the servers in the provided {@link Pool}. If one of the servers goes down while dispatching - * or executing the function on the server, an Exception will be thrown. - * - * @param pool the set of servers to execute the function - * @return Execution - * @throws FunctionException if Pool instance passed in is null - * @since GemFire 6.0 - */ - public static Execution onServers(Pool pool) { - return functionSvcMgr.onServers(pool); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a server that the given cache is connected to. - * <p> - * If the server goes down while dispatching or executing the function, an Exception will be - * thrown. - * - * @param regionService obtained from {@link ClientCacheFactory#create} or - * {@link ClientCache#createAuthenticatedView(Properties)}. - * @return Execution - * @throws FunctionException if cache is null, is not on a client, or it does not have a default - * pool - * @since GemFire 6.5 - */ - public static Execution onServer(RegionService regionService) { - return functionSvcMgr.onServer(regionService); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all the servers that the given cache is connected to. If one of the servers goes down while - * dispatching or executing the function on the server, an Exception will be thrown. - * - * @param regionService obtained from {@link ClientCacheFactory#create} or - * {@link ClientCache#createAuthenticatedView(Properties)}. - * @return Execution - * @throws FunctionException if cache is null, is not on a client, or it does not have a default - * pool - * @since GemFire 6.5 - */ - public static Execution onServers(RegionService regionService) { - return functionSvcMgr.onServers(regionService); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a {@link DistributedMember}. If the member is not found, executing the function will throw an - * Exception. If the member goes down while dispatching or executing the function on the member, - * an Exception will be thrown. - * - * @param distributedMember defines a member in the distributed system - * @return Execution - * @throws FunctionException if distributedMember is null - * @since GemFire 7.0 - * - */ - public static Execution onMember(DistributedMember distributedMember) { - return functionSvcMgr.onMember(getDistributedSystem(), distributedMember); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all peer members. If the optional groups parameter is provided, function is executed on all - * members that belong to the provided groups. - * <p> - * If one of the members goes down while dispatching or executing the function on the member, an - * Exception will be thrown. - * - * @param groups optional list of GemFire configuration property "groups" (see - * <a href="../../distributed/DistributedSystem.html#groups"> <code>groups</code></a>) on - * which to execute the function. Function will be executed on all members of each group - * @return Execution - * - * @throws FunctionException if no members are found belonging to the provided groups - * @since GemFire 7.0 - */ - public static Execution onMembers(String... groups) { - return functionSvcMgr.onMembers(getDistributedSystem(), groups); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * the set of {@link DistributedMember}s. If one of the members goes down while dispatching or - * executing the function, an Exception will be thrown. - * - * @param distributedMembers set of distributed members on which {@link Function} to be executed - * @throws FunctionException if distributedMembers is null - * @since GemFire 7.0 - */ - public static Execution onMembers(Set<DistributedMember> distributedMembers) { - return functionSvcMgr.onMembers(getDistributedSystem(), distributedMembers); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * one member of each group provided. - * - * @param groups list of GemFire configuration property "groups" (see - * <a href="../../distributed/DistributedSystem.html#groups"> <code>groups</code></a>) on - * which to execute the function. Function will be executed on one member of each group - * - * @return Execution - * @throws FunctionException if no members are found belonging to the provided groups - * @since GemFire 7.0 - */ - public static Execution onMember(String... groups) { - return functionSvcMgr.onMember(getDistributedSystem(), groups); - } - - /** - * Returns the {@link Function} defined by the functionId, returns null if no function is found - * for the specified functionId - * - * @param functionId - * @return Function - * @throws FunctionException if functionID passed is null - * @since GemFire 6.0 - */ - public static Function getFunction(String functionId) { - return functionSvcMgr.getFunction(functionId); - } - - /** - * Registers the given {@link Function} with the {@link FunctionService} using - * {@link Function#getId()}. - * <p> - * Registering a function allows execution of the function using - * {@link Execution#execute(String)}. Every member that could execute a function using its - * {@link Function#getId()} should register the function. - * </p> - * - * @throws FunctionException if function instance passed is null or Function.getId() returns null - * @since GemFire 6.0 - */ - public static void registerFunction(Function function) { - functionSvcMgr.registerFunction(function); - } - - /** - * Unregisters the given {@link Function} with the {@link FunctionService} using - * {@link Function#getId()}. - * <p> - * - * @throws FunctionException if function instance passed is null or Function.getId() returns null - * @since GemFire 6.0 - */ - public static void unregisterFunction(String functionId) { - functionSvcMgr.unregisterFunction(functionId); - } - - /** - * Returns true if the function is registered to FunctionService - * - * @throws FunctionException if function instance passed is null or Function.getId() returns null - * @since GemFire 6.0 - */ - public static boolean isRegistered(String functionId) { - return functionSvcMgr.isRegistered(functionId); - } - - - /** - * Returns all locally registered functions - * - * @return A view of registered functions as a Map of {@link Function#getId()} to {@link Function} - * @since GemFire 6.0 - */ - public static Map<String, Function> getRegisteredFunctions() { - return functionSvcMgr.getRegisteredFunctions(); - } - - private static DistributedSystem getDistributedSystem() { - DistributedSystem system = InternalDistributedSystem.getConnectedInstance(); - if (system == null) { - throw new DistributedSystemDisconnectedException( - LocalizedStrings.InternalDistributedSystem_THIS_CONNECTION_TO_A_DISTRIBUTED_SYSTEM_HAS_BEEN_DISCONNECTED - .toLocalizedString()); - } - return system; - } -} diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/internal/FunctionServiceManager.java b/geode-core/src/main/java/org/apache/geode/cache/execute/internal/FunctionServiceManager.java index 682d13c..7f09d4e 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/execute/internal/FunctionServiceManager.java +++ b/geode-core/src/main/java/org/apache/geode/cache/execute/internal/FunctionServiceManager.java @@ -14,44 +14,21 @@ */ package org.apache.geode.cache.execute.internal; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionService; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.Pool; -import org.apache.geode.cache.client.PoolManager; -import org.apache.geode.cache.client.internal.InternalClientCache; -import org.apache.geode.cache.client.internal.ProxyCache; -import org.apache.geode.cache.client.internal.ProxyRegion; import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionService; -import org.apache.geode.cache.partition.PartitionRegionHelper; -import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.InternalEntity; -import org.apache.geode.internal.cache.GemFireCacheImpl; -import org.apache.geode.internal.cache.InternalRegion; -import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor; -import org.apache.geode.internal.cache.execute.MemberFunctionExecutor; -import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionExecutor; -import org.apache.geode.internal.cache.execute.ServerFunctionExecutor; -import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor; import org.apache.geode.internal.i18n.LocalizedStrings; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + /** * Provides the entry point into execution of user defined {@linkplain Function}s. * <p> @@ -80,249 +57,6 @@ public class FunctionServiceManager { } /** - * Returns an {@link Execution} object that can be used to execute a data dependent function on - * the specified Region.<br> - * When invoked from a GemFire client, the method returns an Execution instance that sends a - * message to one of the connected servers as specified by the {@link Pool} for the region. <br> - * Depending on the filters setup on the {@link Execution}, the function is executed on all - * GemFire members that define the data region, or a subset of members. - * {@link Execution#withFilter(Set)}). - * - * For DistributedRegions with DataPolicy.NORMAL, it throws UnsupportedOperationException. For - * DistributedRegions with DataPolicy.EMPTY, execute the function on any random member which has - * DataPolicy.REPLICATE <br> - * . For DistributedRegions with DataPolicy.REPLICATE, execute the function locally. For Regions - * with DataPolicy.PARTITION, it executes on members where the data resides as specified by the - * filter. - * - * @return Execution - * @throws FunctionException if the region passed in is null - * @since GemFire 6.0 - */ - public Execution onRegion(Region region) { - if (region == null) { - throw new FunctionException( - LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Region instance ")); - } - - ProxyCache proxyCache = null; - String poolName = region.getAttributes().getPoolName(); - if (poolName != null) { - Pool pool = PoolManager.find(poolName); - if (pool.getMultiuserAuthentication()) { - if (region instanceof ProxyRegion) { - ProxyRegion proxyRegion = (ProxyRegion) region; - region = proxyRegion.getRealRegion(); - proxyCache = proxyRegion.getAuthenticatedCache(); - } else { - throw new UnsupportedOperationException(); - } - } - } - - if (isClientRegion(region)) { - return new ServerRegionFunctionExecutor(region, proxyCache); - } - if (PartitionRegionHelper.isPartitionedRegion(region)) { - return new PartitionedRegionFunctionExecutor(region); - } - return new DistributedRegionFunctionExecutor(region); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a server in the provided {@link Pool}. - * <p> - * If the server goes down while dispatching or executing the function, an Exception will be - * thrown. - * - * @param pool from which to chose a server for execution - * @return Execution - * @throws FunctionException if Pool instance passed in is null - * @since GemFire 6.0 - */ - public Execution onServer(Pool pool, String... groups) { - if (pool == null) { - throw new FunctionException( - LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Pool instance ")); - } - - if (pool.getMultiuserAuthentication()) { - throw new UnsupportedOperationException(); - } - - return new ServerFunctionExecutor(pool, false, groups); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all the servers in the provided {@link Pool}. If one of the servers goes down while dispatching - * or executing the function on the server, an Exception will be thrown. - * - * @param pool the set of servers to execute the function - * @return Execution - * @throws FunctionException if Pool instance passed in is null - * @since GemFire 6.0 - */ - public Execution onServers(Pool pool, String... groups) { - if (pool == null) { - throw new FunctionException( - LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Pool instance ")); - } - - if (pool.getMultiuserAuthentication()) { - throw new UnsupportedOperationException(); - } - - return new ServerFunctionExecutor(pool, true, groups); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a server that the given cache is connected to. - * <p> - * If the server goes down while dispatching or executing the function, an Exception will be - * thrown. - * - * @param regionService obtained from {@link ClientCacheFactory#create} or - * {@link ClientCache#createAuthenticatedView(Properties)} . - * @return Execution - * @throws FunctionException if cache is null, is not on a client, or it does not have a default - * pool - * @since GemFire 6.5 - */ - public Execution onServer(RegionService regionService, String... groups) { - if (regionService == null) { - throw new FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL - .toLocalizedString("RegionService instance ")); - } - if (regionService instanceof GemFireCacheImpl) { - InternalClientCache internalCache = (InternalClientCache) regionService; - if (!internalCache.isClient()) { - throw new FunctionException("The cache was not a client cache"); - } else if (internalCache.getDefaultPool() != null) { - return onServer(internalCache.getDefaultPool(), groups); - } else { - throw new FunctionException("The client cache does not have a default pool"); - } - } else { - ProxyCache proxyCache = (ProxyCache) regionService; - return new ServerFunctionExecutor(proxyCache.getUserAttributes().getPool(), false, proxyCache, - groups); - } - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all the servers that the given cache is connected to. If one of the servers goes down while - * dispatching or executing the function on the server, an Exception will be thrown. - * - * @param regionService obtained from {@link ClientCacheFactory#create} or - * {@link ClientCache#createAuthenticatedView(Properties)} . - * @return Execution - * @throws FunctionException if cache is null, is not on a client, or it does not have a default - * pool - * @since GemFire 6.5 - */ - public Execution onServers(RegionService regionService, String... groups) { - if (regionService == null) { - throw new FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL - .toLocalizedString("RegionService instance ")); - } - if (regionService instanceof GemFireCacheImpl) { - InternalClientCache internalCache = (InternalClientCache) regionService; - if (!internalCache.isClient()) { - throw new FunctionException("The cache was not a client cache"); - } else if (internalCache.getDefaultPool() != null) { - return onServers(internalCache.getDefaultPool(), groups); - } else { - throw new FunctionException("The client cache does not have a default pool"); - } - } else { - ProxyCache proxyCache = (ProxyCache) regionService; - return new ServerFunctionExecutor(proxyCache.getUserAttributes().getPool(), true, proxyCache, - groups); - } - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a {@link DistributedMember} of the {@link DistributedSystem}. If the member is not found in the - * system, the function execution will throw an Exception. If the member goes down while - * dispatching or executing the function on the member, an Exception will be thrown. - * - * @param system defines the distributed system - * @param distributedMember defines a member in the distributed system - * @return Execution - * @throws FunctionException if either input parameter is null - * @since GemFire 6.0 - * - */ - public Execution onMember(DistributedSystem system, DistributedMember distributedMember) { - if (system == null) { - throw new FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL - .toLocalizedString("DistributedSystem instance ")); - } - if (distributedMember == null) { - throw new FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL - .toLocalizedString("DistributedMember instance ")); - } - return new MemberFunctionExecutor(system, distributedMember); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all members of the {@link DistributedSystem}. If one of the members goes down while dispatching - * or executing the function on the member, an Exception will be thrown. - * - * @param system defines the distributed system - * @return Execution - * - * @throws FunctionException if DistributedSystem instance passed is null - * @since GemFire 6.0 - */ - public Execution onMembers(DistributedSystem system, String... groups) { - if (system == null) { - throw new FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL - .toLocalizedString("DistributedSystem instance ")); - } - if (groups.length == 0) { - return new MemberFunctionExecutor(system); - } - Set<DistributedMember> members = new HashSet<DistributedMember>(); - for (String group : groups) { - members.addAll(system.getGroupMembers(group)); - } - if (members.isEmpty()) { - throw new FunctionException(LocalizedStrings.FunctionService_NO_MEMBERS_FOUND_IN_GROUPS - .toLocalizedString(Arrays.toString(groups))); - } - return new MemberFunctionExecutor(system, members); - } - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * the set of {@link DistributedMember}s of the {@link DistributedSystem}. If one of the members - * goes down while dispatching or executing the function, an Exception will be thrown. - * - * @param system defines the distributed system - * @param distributedMembers set of distributed members on which {@link Function} to be executed - * @throws FunctionException if DistributedSystem instance passed is null - * @since GemFire 6.0 - */ - public Execution onMembers(DistributedSystem system, Set<DistributedMember> distributedMembers) { - if (system == null) { - throw new FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL - .toLocalizedString("DistributedSystem instance ")); - } - if (distributedMembers == null) { - throw new FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL - .toLocalizedString("distributedMembers set ")); - } - return new MemberFunctionExecutor(system, distributedMembers); - } - - /** * Returns the {@link Function} defined by the functionId, returns null if no function is found * for the specified functionId * @@ -421,36 +155,4 @@ public class FunctionServiceManager { unregisterFunction(functionId); } } - - /** - * @return true if the method is called on a region has a {@link Pool}. - * @since GemFire 6.0 - */ - private boolean isClientRegion(Region region) { - return ((InternalRegion) region).hasServerProxy(); - } - - public Execution onMember(DistributedSystem system, String... groups) { - if (system == null) { - throw new FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL - .toLocalizedString("DistributedSystem instance ")); - } - Set<DistributedMember> members = new HashSet<>(); - for (String group : groups) { - List<DistributedMember> grpMembers = new ArrayList<>(system.getGroupMembers(group)); - if (!grpMembers.isEmpty()) { - if (!RANDOM_onMember && grpMembers.contains(system.getDistributedMember())) { - members.add(system.getDistributedMember()); - } else { - Collections.shuffle(grpMembers); - members.add(grpMembers.get(0)); - } - } - } - if (members.isEmpty()) { - throw new FunctionException(LocalizedStrings.FunctionService_NO_MEMBERS_FOUND_IN_GROUPS - .toLocalizedString(Arrays.toString(groups))); - } - return new MemberFunctionExecutor(system, members); - } } diff --git a/geode-core/src/main/java/org/apache/geode/cache/function/FunctionService.java b/geode-core/src/main/java/org/apache/geode/cache/function/FunctionService.java deleted file mode 100644 index 8cfb278..0000000 --- a/geode-core/src/main/java/org/apache/geode/cache/function/FunctionService.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.geode.cache.function; - -import org.apache.geode.cache.common.function.FunctionServiceBase; -import org.apache.geode.cache.execute.Execution; -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionException; -import org.apache.geode.distributed.DistributedMember; - -import java.util.Set; - -public interface FunctionService extends FunctionServiceBase { - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * a {@link DistributedMember}. If the member is not found, executing the function will throw an - * Exception. If the member goes down while dispatching or executing the function on the member, - * an Exception will be thrown. - * @param distributedMember defines a member in the distributed system - * @return Execution - * @throws FunctionException if distributedMember is null - * @since GemFire 7.0 - */ - Execution onMember(DistributedMember distributedMember); - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * all peer members. If the optional groups parameter is provided, function is executed on all - * members that belong to the provided groups. - * <p> - * If one of the members goes down while dispatching or executing the function on the member, an - * Exception will be thrown. - * @param groups optional list of GemFire configuration property "groups" (see - * <a href="../../distributed/DistributedSystem.html#groups"> <code>groups</code></a>) on - * which to execute the function. Function will be executed on all members of each group - * @return Execution - * @throws FunctionException if no members are found belonging to the provided groups - * @since GemFire 7.0 - */ - Execution onMembers(String... groups); - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * the set of {@link DistributedMember}s. If one of the members goes down while dispatching or - * executing the function, an Exception will be thrown. - * @param distributedMembers set of distributed members on which {@link Function} to be executed - * @throws FunctionException if distributedMembers is null - * @since GemFire 7.0 - */ - Execution onMembers(Set<DistributedMember> distributedMembers); - - /** - * Returns an {@link Execution} object that can be used to execute a data independent function on - * one member of each group provided. - * @param groups list of GemFire configuration property "groups" (see - * <a href="../../distributed/DistributedSystem.html#groups"> <code>groups</code></a>) on - * which to execute the function. Function will be executed on one member of each group - * @return Execution - * @throws FunctionException if no members are found belonging to the provided groups - * @since GemFire 7.0 - */ - Execution onMember(String... groups); -} diff --git a/geode-core/src/main/java/org/apache/geode/cache/function/FunctionService.kt b/geode-core/src/main/java/org/apache/geode/cache/function/FunctionService.kt new file mode 100644 index 0000000..0d2e797 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/function/FunctionService.kt @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.cache.function + +import org.apache.geode.cache.common.function.FunctionServiceBase +import org.apache.geode.cache.execute.Execution +import org.apache.geode.cache.execute.Function +import org.apache.geode.cache.execute.FunctionException +import org.apache.geode.distributed.DistributedMember + +interface FunctionService : FunctionServiceBase { + /** + * Returns an [Execution] object that can be used to execute a data independent function on + * a [DistributedMember]. If the member is not found, executing the function will throw an + * Exception. If the member goes down while dispatching or executing the function on the member, + * an Exception will be thrown. + * @param distributedMember defines a member in the distributed system + * @return Execution + * @throws FunctionException if distributedMember is null + * @since GemFire 7.0 + */ + fun onMember(distributedMember: DistributedMember?): Execution<*, *, *> + + /** + * Returns an [Execution] object that can be used to execute a data independent function on + * all peer members. If the optional groups parameter is provided, function is executed on all + * members that belong to the provided groups. + * + * + * If one of the members goes down while dispatching or executing the function on the member, an + * Exception will be thrown. + * @param groups optional list of GemFire configuration property "groups" (see + * [ `groups`](../../distributed/DistributedSystem.html#groups)) on + * which to execute the function. Function will be executed on all members of each group + * @return Execution + * @throws FunctionException if no members are found belonging to the provided groups + * @since GemFire 7.0 + */ + fun onMembers(vararg groups: String?): Execution<*, *, *> + + /** + * Returns an [Execution] object that can be used to execute a data independent function on + * the set of [DistributedMember]s. If one of the members goes down while dispatching or + * executing the function, an Exception will be thrown. + * @param distributedMembers set of distributed members on which [Function] to be executed + * @throws FunctionException if distributedMembers is null + * @since GemFire 7.0 + */ + fun onMembers(distributedMembers: Set<DistributedMember>?): Execution<*, *, *> + + /** + * Returns an [Execution] object that can be used to execute a data independent function on + * one member of each group provided. + * @param groups list of GemFire configuration property "groups" (see + * [ `groups`](../../distributed/DistributedSystem.html#groups)) on + * which to execute the function. Function will be executed on one member of each group + * @return Execution + * @throws FunctionException if no members are found belonging to the provided groups + * @since GemFire 7.0 + */ + fun onMember(vararg groups: String?): Execution<*, *, *> +} diff --git a/geode-core/src/main/java/org/apache/geode/cache/function/internal/FunctionServiceImpl.kt b/geode-core/src/main/java/org/apache/geode/cache/function/internal/FunctionServiceImpl.kt new file mode 100644 index 0000000..9bb294b --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/cache/function/internal/FunctionServiceImpl.kt @@ -0,0 +1,91 @@ +package org.apache.geode.cache.function.internal + +import org.apache.geode.cache.Region +import org.apache.geode.cache.execute.Execution +import org.apache.geode.cache.execute.Function +import org.apache.geode.cache.execute.FunctionException +import org.apache.geode.cache.execute.internal.FunctionServiceManager +import org.apache.geode.cache.execute.internal.FunctionServiceManager.RANDOM_onMember +import org.apache.geode.cache.function.FunctionService +import org.apache.geode.cache.partition.PartitionRegionHelper +import org.apache.geode.distributed.DistributedMember +import org.apache.geode.distributed.DistributedSystem +import org.apache.geode.internal.cache.execute.DistributedRegionFunctionExecutor +import org.apache.geode.internal.cache.execute.MemberFunctionExecutor +import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionExecutor +import org.apache.geode.internal.i18n.LocalizedStrings +import java.util.* + +class FunctionServiceImpl(val distributedSystem: DistributedSystem) : FunctionService { + private val functionServiceManager = FunctionServiceManager() + + override fun onRegion(region: Region<*, *>?): Execution<*, *, *> { + region?.let { + return if (PartitionRegionHelper.isPartitionedRegion(it)) { + PartitionedRegionFunctionExecutor(it) + } else { + DistributedRegionFunctionExecutor(it) + } + } + throw FunctionException( + LocalizedStrings.FunctionService_0_PASSED_IS_NULL.toLocalizedString("Region instance ")) + } + + override fun onMember(distributedMember: DistributedMember?): Execution<*, *, *> { + if (distributedMember == null) { + throw FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL + .toLocalizedString("DistributedMember instance ")) + } + return MemberFunctionExecutor(distributedSystem, distributedMember) + } + + override fun onMembers(vararg groups: String?): Execution<*, *, *> { + if (groups.isEmpty()) { + return MemberFunctionExecutor(distributedSystem) + } + val members = HashSet<DistributedMember>() + for (group in groups) { + members.addAll(distributedSystem.getGroupMembers(group)) + } + if (members.isEmpty()) { + throw FunctionException(LocalizedStrings.FunctionService_NO_MEMBERS_FOUND_IN_GROUPS + .toLocalizedString(Arrays.toString(groups))) + } + return MemberFunctionExecutor(distributedSystem, members) + } + + override fun onMembers(distributedMembers: Set<DistributedMember>?): Execution<*, *, *> { + if (distributedMembers == null) { + throw FunctionException(LocalizedStrings.FunctionService_0_PASSED_IS_NULL + .toLocalizedString("distributedMembers set ")) + } + return MemberFunctionExecutor(distributedSystem, distributedMembers) + } + + override fun onMember(vararg groups: String?): Execution<*, *, *> { + val members = HashSet<DistributedMember>() + for (group in groups) { + val grpMembers = ArrayList<DistributedMember>(distributedSystem.getGroupMembers(group)) + if (!grpMembers.isEmpty()) { + if (!RANDOM_onMember && grpMembers.contains(distributedSystem.distributedMember)) { + members.add(distributedSystem.distributedMember) + } else { + grpMembers.shuffle() + members.add(grpMembers[0]) + } + } + } + if (members.isEmpty()) { + throw FunctionException(LocalizedStrings.FunctionService_NO_MEMBERS_FOUND_IN_GROUPS + .toLocalizedString(Arrays.toString(groups))) + } + return MemberFunctionExecutor(distributedSystem, members) + } + + + override fun getFunction(functionId: String?): Function<*> = functionServiceManager.getFunction(functionId) + override fun registerFunction(function: Function<*>?) = functionServiceManager.registerFunction(function) + override fun unregisterFunction(functionId: String?) = functionServiceManager.unregisterFunction(functionId) + override fun isRegistered(functionId: String?): Boolean = functionServiceManager.isRegistered(functionId) + override fun getRegisteredFunctions(): MutableMap<String, Function<Any>> = functionServiceManager.registeredFunctions +} \ No newline at end of file diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java index d3c47a7..aa362d1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java @@ -46,18 +46,16 @@ public class ServerFunctionExecutor extends AbstractExecution { private String[] groups; - public ServerFunctionExecutor(Pool p, boolean allServers, String... groups) { - this.pool = (PoolImpl) p; + public ServerFunctionExecutor(Pool pool, boolean allServers, String... groups) { + this.pool = (PoolImpl) pool; this.allServers = allServers; this.groups = groups; } - public ServerFunctionExecutor(Pool p, boolean allServers, ProxyCache proxyCache, + public ServerFunctionExecutor(Pool pool, boolean allServers, ProxyCache proxyCache, String... groups) { - this.pool = (PoolImpl) p; - this.allServers = allServers; + this(pool,allServers,groups); this.proxyCache = proxyCache; - this.groups = groups; } public ServerFunctionExecutor(ServerFunctionExecutor sfe) { diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 9c9d53d..ff7a92f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Tue Jul 05 14:26:44 PDT 2016 +#Thu Feb 22 11:03:00 PST 2018 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.5-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.5-all.zip -- To stop receiving notification emails like this one, please contact u...@apache.org.