This is an automated email from the ASF dual-hosted git repository. jagadish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 6d5f446 SAMZA-1531: Support run.id in standalone for batch processing. 6d5f446 is described below commit 6d5f4461d0fa2485ea1274d051f57c5983df5f2e Author: Manasa <mgadup...@linkedin.com> AuthorDate: Wed May 22 14:09:08 2019 -0700 SAMZA-1531: Support run.id in standalone for batch processing. Adds run.id in standalone: only for BATCH mode and no changes for STREAM mode. Known issues: - No support for Azure in this PR. a follow up PR will be made for it. However, this PR does not break Azure/Passthrough and resorts to current behavior for Azure/Passthrough. - upon an unclean shut down of the job, a restart before ZK server session timeout would result in reuse of the old runid. Author: Manasa <mgadup...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org>, Bharath <bkuma...@linkedin.com> Closes #938 from lakshmi-manasa-g/standaloneRunid --- .../samza/coordinator/AzureCoordinationUtils.java | 19 +- .../org/apache/samza/coordinator/AzureLock.java | 11 +- .../samza/coordinator/ClusterMembership.java | 60 ++++ ...ckWithState.java => CoordinationConstants.java} | 26 +- .../samza/coordinator/CoordinationUtils.java | 9 +- .../apache/samza/coordinator/DistributedLock.java | 11 +- .../apache/samza/coordinator/RunIdGenerator.java | 110 +++++++ .../apache/samza/execution/LocalJobPlanner.java | 126 ++++++-- .../samza/runtime/LocalApplicationRunner.java | 93 +++++- .../org/apache/samza/zk/ZkClusterMembership.java | 71 +++++ .../org/apache/samza/zk/ZkCoordinationUtils.java | 10 +- .../samza/zk/ZkCoordinationUtilsFactory.java | 4 +- .../org/apache/samza/zk/ZkDistributedLock.java | 94 +++--- .../apache/samza/zk/ZkMetadataStoreFactory.java | 6 +- .../samza/coordinator/TestRunIdGenerator.java | 93 ++++++ .../samza/execution/TestLocalJobPlanner.java | 34 ++- .../samza/runtime/TestLocalApplicationRunner.java | 122 +++++++- .../apache/samza/zk/TestZkClusterMembership.java | 133 +++++++++ .../org/apache/samza/zk/TestZkDistributedLock.java | 126 ++++++++ .../org/apache/samza/zk/TestZkMetadataStore.java | 2 +- .../processor/TestZkLocalApplicationRunner.java | 321 +++++++++++++++++++-- 21 files changed, 1323 insertions(+), 158 deletions(-) diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java index f50ab72..eaac8e5 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java @@ -22,7 +22,6 @@ package org.apache.samza.coordinator; import org.apache.samza.AzureClient; import org.apache.samza.config.AzureConfig; import org.apache.samza.config.Config; -import org.apache.samza.util.BlobUtils; public class AzureCoordinationUtils implements CoordinationUtils { @@ -45,11 +44,21 @@ public class AzureCoordinationUtils implements CoordinationUtils { return null; } + /** + * To support DistributedLock in Azure, even {@link org.apache.samza.metadatastore.MetadataStore} needs to be implemented. + * Because, both of these are used in {@link org.apache.samza.execution.LocalJobPlanner} for intermediate stream creation. + * Currently MetadataStore defaults to ZkMetataStore in LocalJobPlanner due to `metadata.store.factory` not being exposed + * So in order to avoid using AzureLock coupled with ZkMetadataStore, DistributedLock is not supported for Azure + * See SAMZA-2180 for more details. + */ @Override - public DistributedLockWithState getLockWithState(String lockId) { - BlobUtils blob = new BlobUtils(client, azureConfig.getAzureContainerName(), - azureConfig.getAzureBlobName() + lockId, azureConfig.getAzureBlobLength()); - return new AzureLock(blob); + public DistributedLock getLock(String lockId) { + throw new UnsupportedOperationException("DistributedLock not supported in Azure!"); + } + + @Override + public ClusterMembership getClusterMembership() { + throw new UnsupportedOperationException("ClusterMembership not supported in Azure!"); } @Override diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java index 8cddc4c..22cdf2e 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java +++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java @@ -19,8 +19,8 @@ package org.apache.samza.coordinator; +import java.time.Duration; import java.util.Random; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.samza.AzureException; @@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory; /** * Distributed lock primitive for Azure. */ -public class AzureLock implements DistributedLockWithState { +public class AzureLock implements DistributedLock { private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class); private static final int LEASE_TIME_IN_SEC = 60; @@ -51,14 +51,13 @@ public class AzureLock implements DistributedLockWithState { * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. * The lock is acquired when the blob is leased successfully. * @param timeout Duration after which timeout occurs. - * @param unit Time Unit of the timeout defined above. * @return true if the lock was acquired successfully, false if lock acquire operation is unsuccessful even after subsequent tries within the timeout range. */ @Override - public boolean lockIfNotSet(long timeout, TimeUnit unit) { + public boolean lock(Duration timeout) { //Start timer for timeout long startTime = System.currentTimeMillis(); - long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); + long lockTimeout = timeout.toMillis(); Random random = new Random(); while ((System.currentTimeMillis() - startTime) < lockTimeout) { @@ -87,7 +86,7 @@ public class AzureLock implements DistributedLockWithState { * Unlocks, by releasing the lease on the blob. */ @Override - public void unlockAndSet() { + public void unlock() { boolean status = leaseBlobManager.releaseLease(leaseId.get()); if (status) { LOG.info("Unlocked successfully."); diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java b/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java new file mode 100644 index 0000000..45b6fd3 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator; + +import org.apache.samza.annotation.InterfaceStability; + +/** + * Coordination Primitive to maintain the list of processors in the quorum + * + * Guarantees: + * 1. operations are linearizable + * 2. registration persistence in the absence of connection errors + * + * Non-guarantees: + * 1. thread safe + * 2. concurrent access of the list of processors in the quorum + * 3. persistence of registration across connection errors + * 4. processorId as indicator of registration order + * + * Implementor responsibilities: + * 1. registerProcessor returns a unique processorId + * 2. getNumberOfProcessors by a processor should reflect at least its own registration status + * 3. unregisterProcessor for a null or unregistered processorId is a no-op + */ +@InterfaceStability.Evolving +public interface ClusterMembership { + /** + * add processor to the list of processors in the quorum + * @return unique id of the processor registration + */ + String registerProcessor(); + + /** + * @return number of processors in the list + */ + int getNumberOfProcessors(); + + /** + * remove processor from the list of processors in the quorum + * @param processorId to be removed from the list + */ + void unregisterProcessor(String processorId); +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java similarity index 54% rename from samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java rename to samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java index c8e9033..d7a648b 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java @@ -19,24 +19,12 @@ package org.apache.samza.coordinator; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +public final class CoordinationConstants { + private CoordinationConstants() {} -public interface DistributedLockWithState { - - /** - * Try to acquire the lock, but first check if the state flag is set. If it is set, return false. - * If the flag is not set, and lock is acquired - return true. - * @param timeout Duration of lock acquiring timeout. - * @param unit Time Unit of the timeout defined above. - * @return true if lock is acquired successfully, false if state is already set. - * @throws TimeoutException if could not acquire the lock. - */ - boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException; - - /** - * Release the lock and set the state - */ - void unlockAndSet(); -} \ No newline at end of file + public static final String RUNID_STORE_KEY = "runId"; + public static final String APPLICATION_RUNNER_PATH_SUFFIX = "ApplicationRunnerData"; + public static final String RUNID_LOCK_ID = "runId"; + public static final int LOCK_TIMEOUT_MS = 300000; +} diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java index 9ebd2e2..81507c9 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java @@ -24,10 +24,11 @@ import org.apache.samza.annotation.InterfaceStability; * * Coordination service provides synchronization primitives. * The actual implementation (for example ZK based) is left to each implementation class. - * This service provide three primitives: + * This service provides the following primitives: * - LeaderElection * - Latch - * - LockWithState (does not lock if state is set) + * - Lock + * - ClusterMembership (to check number of processors in quorum) */ @InterfaceStability.Evolving public interface CoordinationUtils { @@ -37,7 +38,9 @@ public interface CoordinationUtils { Latch getLatch(int size, String latchId); - DistributedLockWithState getLockWithState(String lockId); + DistributedLock getLock(String lockId); + + ClusterMembership getClusterMembership(); /** * utilites cleanup diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java similarity index 78% rename from samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java rename to samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java index 6972cd9..a605656 100644 --- a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java @@ -19,21 +19,20 @@ package org.apache.samza.coordinator; -import java.util.concurrent.TimeUnit; +import java.time.Duration; public interface DistributedLock { /** - * Tries to acquire the lock + * Try to acquire the lock * @param timeout Duration of lock acquiring timeout. - * @param unit Time Unit of the timeout defined above. - * @return true if lock is acquired successfully, false if it times out. + * @return true if lock is acquired successfully else returns false if failed to acquire within timeout */ - boolean lock(long timeout, TimeUnit unit); + boolean lock(Duration timeout); /** - * Releases the lock + * Release the lock */ void unlock(); } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java new file mode 100644 index 0000000..284c0bf --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import com.google.common.base.Preconditions; +import java.io.UnsupportedEncodingException; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import org.apache.samza.SamzaException; +import org.apache.samza.metadatastore.MetadataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Generates RunId for Standalone use case + * If there is only one processor in the quorum (registered with ClusterMembership) then create new runid and add to store + * Else read runid from the store + * + * Steps to generate: + * 1. acquire lock + * 2. add self to quorum (register itself with ClusterMembership) + * 3. get number of processors in quorum + * 4. if qurorum size is 1 (only self) then create new runid and write to store + * 5. if quorum size if greater than 1 then read runid from store + * 6. unlock + */ +public class RunIdGenerator { + private static final Logger LOG = LoggerFactory.getLogger(RunIdGenerator.class); + + private final CoordinationUtils coordinationUtils; + private final MetadataStore metadataStore; + private final ClusterMembership clusterMembership; + private String processorId = null; + private volatile boolean closed = false; + + public RunIdGenerator(CoordinationUtils coordinationUtils, MetadataStore metadataStore) { + Preconditions.checkNotNull(coordinationUtils, "CoordinationUtils cannot be null"); + Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null"); + this.coordinationUtils = coordinationUtils; + this.metadataStore = metadataStore; + this.clusterMembership = coordinationUtils.getClusterMembership(); + Preconditions.checkNotNull(this.clusterMembership, "Failed to create utils for run id generation"); + } + + public Optional<String> getRunId() { + DistributedLock runIdLock; + String runId = null; + + runIdLock = coordinationUtils.getLock(CoordinationConstants.RUNID_LOCK_ID); + if (runIdLock == null) { + throw new SamzaException("Failed to create utils for run id generation"); + } + + try { + // acquire lock to write or read run.id + if (runIdLock.lock(Duration.ofMillis(CoordinationConstants.LOCK_TIMEOUT_MS))) { + LOG.info("lock acquired for run.id generation by this processor"); + processorId = clusterMembership.registerProcessor(); + int numberOfActiveProcessors = clusterMembership.getNumberOfProcessors(); + if (numberOfActiveProcessors == 0) { + String msg = String.format("Processor failed to fetch number of processors for run.id generation"); + throw new SamzaException(msg); + } + if (numberOfActiveProcessors == 1) { + runId = + String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8); + LOG.info("Writing the run id for this run as {}", runId); + metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, runId.getBytes("UTF-8")); + } else { + runId = new String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY)); + LOG.info("Read the run id for this run as {}", runId); + } + runIdLock.unlock(); + } else { + throw new SamzaException("Processor timed out waiting to acquire lock for run.id generation"); + } + } catch (UnsupportedEncodingException e) { + throw new SamzaException("Processor could not serialize/deserialize string for run.id generation", e); + } + return Optional.ofNullable(runId); + } + + /** + * might be called several times and hence should be idempotent + */ + public void close() { + if (!closed && processorId != null) { + closed = true; + clusterMembership.unregisterProcessor(processorId); + } + } +} diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java index 5cce6c5..48a4a3e 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java @@ -18,8 +18,9 @@ */ package org.apache.samza.execution; +import java.io.UnsupportedEncodingException; +import java.time.Duration; import java.util.List; -import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.samza.SamzaException; @@ -28,9 +29,15 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorImpl; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; +import org.apache.samza.coordinator.CoordinationConstants; import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.DistributedLockWithState; +import org.apache.samza.coordinator.DistributedLock; +import org.apache.samza.metadatastore.MetadataStore; +import org.apache.samza.metadatastore.MetadataStoreFactory; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.system.StreamSpec; +import org.apache.samza.util.Util; +import org.apache.samza.zk.ZkMetadataStoreFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,19 +50,35 @@ import org.slf4j.LoggerFactory; */ public class LocalJobPlanner extends JobPlanner { private static final Logger LOG = LoggerFactory.getLogger(LocalJobPlanner.class); - private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData"; + private static final String STREAM_CREATION_METADATA_STORE = "StreamCreationCoordinationStore"; + private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory"; + public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName(); + private static final String STREAM_CREATED_STATE_KEY = "StreamCreated_%s"; - private final String uid = UUID.randomUUID().toString(); + private final String processorId; + private final CoordinationUtils coordinationUtils; + private final String runId; - public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) { + public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor, String processorId) { super(descriptor); + this.processorId = processorId; + JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig); + this.coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, processorId, userConfig); + this.runId = null; + } + + public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor, CoordinationUtils coordinationUtils, String processorId, String runId) { + super(descriptor); + this.coordinationUtils = coordinationUtils; + this.processorId = processorId; + this.runId = runId; } @Override public List<JobConfig> prepareJobs() { // for high-level DAG, generating the plan and job configs // 1. initialize and plan - ExecutionPlan plan = getExecutionPlan(); + ExecutionPlan plan = getExecutionPlan(runId); String executionPlanJson = ""; try { @@ -104,36 +127,89 @@ public class LocalJobPlanner extends JobPlanner { LOG.info("Set of intermediate streams is empty. Nothing to create."); return; } - LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid); + LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", processorId); // Move the scope of coordination utils within stream creation to address long idle connection problem. // Refer SAMZA-1385 for more details - JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig); - String coordinationId = new ApplicationConfig(userConfig).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX; - CoordinationUtils coordinationUtils = - jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, userConfig); if (coordinationUtils == null) { - LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid); + LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", processorId); // each application process will try creating the streams, which // requires stream creation to be idempotent streamManager.createStreams(intStreams); return; } - DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId); + // If BATCH, then need to create new intermediate streams every run. + // planId does not change every run and hence, need to use runid + // as the lockId to create a new lock with state each run + // to create new streams each run. + // If run.id is null, defaults to old behavior of using planId + boolean isAppModeBatch = new ApplicationConfig(userConfig).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; + String lockId = planId; + if (isAppModeBatch && runId != null) { + lockId = runId; + } try { - // check if the processor needs to go through leader election and stream creation - if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) { - LOG.info("lock acquired for streams creation by " + uid); - streamManager.createStreams(intStreams); - lockWithState.unlockAndSet(); - } else { - LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid); - } - } catch (TimeoutException e) { - String msg = String.format("Processor {} failed to get the lock for stream initialization", uid); - throw new SamzaException(msg, e); + checkAndCreateStreams(lockId, intStreams, streamManager); + } catch (TimeoutException te) { + throw new SamzaException(String.format("Processor {} failed to get the lock for stream initialization within timeout.", processorId), te); } finally { - coordinationUtils.close(); + if (!isAppModeBatch && coordinationUtils != null) { + coordinationUtils.close(); + } + } + } + + private void checkAndCreateStreams(String lockId, List<StreamSpec> intStreams, StreamManager streamManager) throws TimeoutException { + MetadataStore metadataStore = getMetadataStore(); + DistributedLock distributedLock = coordinationUtils.getLock(lockId); + if (distributedLock == null || metadataStore == null) { + LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", processorId); + // each application process will try creating the streams, which requires stream creation to be idempotent + streamManager.createStreams(intStreams); + return; + } + //Start timer for timeout + long startTime = System.currentTimeMillis(); + long lockTimeout = TimeUnit.MILLISECONDS.convert(CoordinationConstants.LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // If "stream created state" exists in store then skip stream creation + // Else acquire lock, create streams, set state in store and unlock + // Checking for state before acquiring lock to prevent all processors from acquiring lock + // In a while loop so that if two processors check state simultaneously then + // to make sure the processor not acquiring the lock + // does not die of timeout exception and comes back and checks for state and proceeds + while ((System.currentTimeMillis() - startTime) < lockTimeout) { + if (metadataStore.get(String.format(STREAM_CREATED_STATE_KEY, lockId)) != null) { + LOG.info("Processor {} found streams created state data. They must've been created by another processor.", processorId); + break; + } + try { + if (distributedLock.lock(Duration.ofMillis(10000))) { + LOG.info("lock acquired for streams creation by Processor " + processorId); + streamManager.createStreams(intStreams); + String streamCreatedMessage = "Streams created by processor " + processorId; + metadataStore.put(String.format(STREAM_CREATED_STATE_KEY, lockId), streamCreatedMessage.getBytes("UTF-8")); + distributedLock.unlock(); + break; + } else { + LOG.info("Processor {} failed to get the lock for stream initialization. Will try again until time out", processorId); + } + } catch (UnsupportedEncodingException e) { + String msg = String.format("Processor {} failed to encode string for stream initialization", processorId); + throw new SamzaException(msg, e); + } + } + if ((System.currentTimeMillis() - startTime) >= lockTimeout) { + throw new TimeoutException(String.format("Processor {} failed to get the lock for stream initialization within {} milliseconds.", processorId, CoordinationConstants.LOCK_TIMEOUT_MS)); + } + } + + private MetadataStore getMetadataStore() { + String metadataStoreFactoryClass = appDesc.getConfig().get(METADATA_STORE_FACTORY_CONFIG); + if (metadataStoreFactoryClass == null) { + metadataStoreFactoryClass = DEFAULT_METADATA_STORE_FACTORY; } + MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); + return metadataStoreFactory.getMetadataStore(STREAM_CREATION_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap()); } } diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java index 7da3369..c0b85f2 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -42,14 +43,22 @@ import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.context.ExternalContext; +import org.apache.samza.coordinator.CoordinationConstants; +import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.coordinator.RunIdGenerator; import org.apache.samza.execution.LocalJobPlanner; import org.apache.samza.job.ApplicationStatus; +import org.apache.samza.metadatastore.MetadataStore; +import org.apache.samza.metadatastore.MetadataStoreFactory; +import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.MetricsReporter; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.task.TaskFactory; import org.apache.samza.task.TaskFactoryUtil; import org.apache.samza.util.Util; +import org.apache.samza.zk.ZkMetadataStoreFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,13 +68,20 @@ import org.slf4j.LoggerFactory; public class LocalApplicationRunner implements ApplicationRunner { private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class); + private static final String PROCESSOR_ID = UUID.randomUUID().toString(); + private final static String RUN_ID_METADATA_STORE = "RunIdCoordinationStore"; + private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory"; + public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName(); private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc; - private final LocalJobPlanner planner; private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet(); private final CountDownLatch shutdownLatch = new CountDownLatch(1); private final AtomicInteger numProcessorsToStart = new AtomicInteger(); private final AtomicReference<Throwable> failure = new AtomicReference<>(); + private final boolean isAppModeBatch; + private final Optional<CoordinationUtils> coordinationUtils; + private Optional<String> runId = Optional.empty(); + private Optional<RunIdGenerator> runIdGenerator = Optional.empty(); private ApplicationStatus appStatus = ApplicationStatus.New; @@ -77,20 +93,74 @@ public class LocalApplicationRunner implements ApplicationRunner { */ public LocalApplicationRunner(SamzaApplication app, Config config) { this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config); - this.planner = new LocalJobPlanner(appDesc); + isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; + coordinationUtils = getCoordinationUtils(config); } /** * Constructor only used in unit test to allow injection of {@link LocalJobPlanner} */ @VisibleForTesting - LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, LocalJobPlanner planner) { + LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, Optional<CoordinationUtils> coordinationUtils) { this.appDesc = appDesc; - this.planner = planner; + isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; + this.coordinationUtils = coordinationUtils; + } + + private Optional<CoordinationUtils> getCoordinationUtils(Config config) { + if (!isAppModeBatch) { + return Optional.empty(); + } + JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config); + CoordinationUtils coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config); + return Optional.ofNullable(coordinationUtils); + } + + /** + * @return LocalJobPlanner created + */ + @VisibleForTesting + LocalJobPlanner getPlanner() { + boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH; + if (!isAppModeBatch) { + return new LocalJobPlanner(appDesc, PROCESSOR_ID); + } + CoordinationUtils coordinationUtils = this.coordinationUtils.orElse(null); + String runId = this.runId.orElse(null); + return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, runId); + } + + + private void initializeRunId() { + if (!isAppModeBatch) { + LOG.info("Not BATCH mode and hence not generating run id"); + return; + } + + if (!coordinationUtils.isPresent()) { + LOG.warn("Coordination utils not present. Aborting run id generation. Will continue execution without a run id."); + return; + } + + try { + MetadataStore metadataStore = getMetadataStore(); + runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore)); + runId = runIdGenerator.flatMap(RunIdGenerator::getRunId); + } catch (Exception e) { + LOG.warn("Failed to generate run id. Will continue execution without a run id. Caused by {}", e); + } + } + + public Optional<String> getRunId() { + return this.runId; } @Override public void run(ExternalContext externalContext) { + initializeRunId(); + + LocalJobPlanner planner = getPlanner(); + try { List<JobConfig> jobConfigs = planner.prepareJobs(); @@ -109,6 +179,7 @@ public class LocalApplicationRunner implements ApplicationRunner { // start the StreamProcessors processors.forEach(StreamProcessor::start); } catch (Throwable throwable) { + cleanup(); appStatus = ApplicationStatus.unsuccessfulFinish(throwable); shutdownLatch.countDown(); throw new SamzaException(String.format("Failed to start application: %s", @@ -119,6 +190,7 @@ public class LocalApplicationRunner implements ApplicationRunner { @Override public void kill() { processors.forEach(StreamProcessor::stop); + cleanup(); } @Override @@ -151,6 +223,7 @@ public class LocalApplicationRunner implements ApplicationRunner { throw new SamzaException(e); } + cleanup(); return finished; } @@ -200,6 +273,17 @@ public class LocalApplicationRunner implements ApplicationRunner { } } + private void cleanup() { + runIdGenerator.ifPresent(RunIdGenerator::close); + coordinationUtils.ifPresent(CoordinationUtils::close); + } + + private MetadataStore getMetadataStore() { + String metadataStoreFactoryClass = appDesc.getConfig().getOrDefault(METADATA_STORE_FACTORY_CONFIG, DEFAULT_METADATA_STORE_FACTORY); + MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class); + return metadataStoreFactory.getMetadataStore(RUN_ID_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap()); + } + /** * Defines a specific implementation of {@link ProcessorLifecycleListener} for local {@link StreamProcessor}s. */ @@ -262,6 +346,7 @@ public class LocalApplicationRunner implements ApplicationRunner { userDefinedProcessorLifecycleListener.afterStop(); } if (processors.isEmpty()) { + cleanup(); // no processor is still running. Notify callers waiting on waitForFinish() shutdownLatch.countDown(); } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java b/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java new file mode 100644 index 0000000..0690821 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import com.google.common.base.Preconditions; +import org.apache.samza.coordinator.ClusterMembership; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZkClusterMembership implements ClusterMembership { + + public static final Logger LOG = LoggerFactory.getLogger(ZkClusterMembership.class); + public static final String PROCESSORS_PATH = "processors"; + private final ZkUtils zkUtils; + private final String processorsPath; + private final String participantId; + private final ZkKeyBuilder keyBuilder; + + public ZkClusterMembership(String participantId, ZkUtils zkUtils) { + Preconditions.checkNotNull(participantId, "ParticipantId cannot be null"); + Preconditions.checkNotNull(zkUtils, "ZkUtils cannot be null"); + this.zkUtils = zkUtils; + this.participantId = participantId; + this.keyBuilder = zkUtils.getKeyBuilder(); + processorsPath = String.format("%s/%s", keyBuilder.getRootPath(), PROCESSORS_PATH); + zkUtils.validatePaths(new String[] {processorsPath}); + } + + @Override + public String registerProcessor() { + String nodePath = zkUtils.getZkClient().createEphemeralSequential(processorsPath + "/", participantId); + LOG.info("created ephemeral node. Registered the processor in the cluster."); + return ZkKeyBuilder.parseIdFromPath(nodePath); + } + + @Override + public int getNumberOfProcessors() { + return zkUtils.getZkClient().getChildren(processorsPath).size(); + } + + @Override + public void unregisterProcessor(String processorId) { + if (processorId == null) { + LOG.warn("Can not unregister processor with null processorId"); + return; + } + String nodePath = processorsPath + "/" + processorId; + if (zkUtils.exists(nodePath)) { + zkUtils.getZkClient().delete(nodePath); + LOG.info("Ephemeral node deleted. Unregistered the processor from cluster membership."); + } else { + LOG.warn("Ephemeral node you want to delete doesnt exist. Processor with id {} is not currently registered.", processorId); + } + } +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java index 3d4a2d1..1e3b58f 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java @@ -20,8 +20,9 @@ package org.apache.samza.zk; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.samza.config.ZkConfig; +import org.apache.samza.coordinator.ClusterMembership; import org.apache.samza.coordinator.CoordinationUtils; -import org.apache.samza.coordinator.DistributedLockWithState; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.coordinator.Latch; import org.apache.samza.coordinator.LeaderElector; import org.slf4j.Logger; @@ -52,7 +53,7 @@ public class ZkCoordinationUtils implements CoordinationUtils { } @Override - public DistributedLockWithState getLockWithState(String lockId) { + public DistributedLock getLock(String lockId) { return new ZkDistributedLock(processorIdStr, zkUtils, lockId); } @@ -70,4 +71,9 @@ public class ZkCoordinationUtils implements CoordinationUtils { public ZkUtils getZkUtils() { return zkUtils; } + + @Override + public ClusterMembership getClusterMembership() { + return new ZkClusterMembership(processorIdStr, zkUtils); + } } diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java index e93f290..0cf93c9 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java @@ -21,6 +21,7 @@ package org.apache.samza.zk; import com.google.common.base.Strings; import org.I0Itec.zkclient.ZkClient; import org.apache.samza.SamzaException; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationUtils; @@ -34,7 +35,8 @@ import org.slf4j.LoggerFactory; public class ZkCoordinationUtilsFactory implements CoordinationUtilsFactory { private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtilsFactory.class); - public CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config config) { + public CoordinationUtils getCoordinationUtils(String coordinationId, String participantId, Config config) { + String groupId = new ApplicationConfig(config).getGlobalAppId() + "/" + coordinationId; ZkConfig zkConfig = new ZkConfig(config); ZkClient zkClient = diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java index cfb4641..24aa26e 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java @@ -18,12 +18,10 @@ */ package org.apache.samza.zk; +import java.time.Duration; import java.util.List; -import java.util.Random; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import org.apache.samza.SamzaException; -import org.apache.samza.coordinator.DistributedLockWithState; +import org.apache.samza.coordinator.DistributedLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +29,7 @@ import org.slf4j.LoggerFactory; /** * Distributed lock primitive for Zookeeper. */ -public class ZkDistributedLock implements DistributedLockWithState { +public class ZkDistributedLock implements DistributedLock { public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class); private static final String STATE_INITED = "sate_initialized"; @@ -39,72 +37,65 @@ public class ZkDistributedLock implements DistributedLockWithState { private final String lockPath; private final String participantId; private final ZkKeyBuilder keyBuilder; - private final Random random = new Random(); private String nodePath = null; - private final String statePath; + private Object mutex; public ZkDistributedLock(String participantId, ZkUtils zkUtils, String lockId) { this.zkUtils = zkUtils; this.participantId = participantId; this.keyBuilder = zkUtils.getKeyBuilder(); - lockPath = String.format("%s/stateLock_%s", keyBuilder.getRootPath(), lockId); - statePath = String.format("%s/%s_%s", lockPath, STATE_INITED, lockId); + lockPath = String.format("%s/lock_%s", keyBuilder.getRootPath(), lockId); zkUtils.validatePaths(new String[] {lockPath}); + mutex = new Object(); + zkUtils.getZkClient().subscribeChildChanges(lockPath, new ParticipantChangeHandler(zkUtils)); } /** * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out. * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock. * @param timeout Duration of lock acquiring timeout. - * @param unit Unit of the timeout defined above. - * @return true if lock is acquired successfully, false if it times out. + * @return true if lock is acquired successfully else returns false if failed to acquire within timeout */ @Override - public boolean lockIfNotSet(long timeout, TimeUnit unit) - throws TimeoutException { + public boolean lock(Duration timeout) { nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId); //Start timer for timeout long startTime = System.currentTimeMillis(); - long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit); + long lockTimeout = timeout.toMillis(); while ((System.currentTimeMillis() - startTime) < lockTimeout) { + synchronized (mutex) { + List<String> children = zkUtils.getZkClient().getChildren(lockPath); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); - if (zkUtils.getZkClient().exists(statePath)) { - // state already set, no point locking - return false; - } - - List<String> children = zkUtils.getZkClient().getChildren(lockPath); - int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath)); - - if (children.size() == 0 || index == -1) { - throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); - } - // Acquires lock when the node has the lowest sequence number and returns. - if (index == 0) { - LOG.info("Acquired lock for participant id: {}", participantId); - return true; - } else { - try { - Thread.sleep(random.nextInt(1000)); - } catch (InterruptedException e) { - Thread.interrupted(); + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); + } + // Acquires lock when the node has the lowest sequence number and returns. + if (index == 0) { + LOG.info("Acquired lock for participant id: {}", participantId); + return true; + } else { + try { + mutex.wait(lockTimeout); + } catch (InterruptedException e) { + Thread.interrupted(); + } + LOG.info("Trying to acquire lock again..."); } - LOG.info("Trying to acquire lock again..."); } } - throw new TimeoutException("could not acquire lock for " + timeout + " " + unit.toString()); + LOG.info("Failed to acquire lock within {} milliseconds.", lockTimeout); + return false; } /** * Unlocks, by deleting the ephemeral sequential node created to acquire the lock. */ @Override - public void unlockAndSet() { - // set state - zkUtils.getZkClient().createPersistent(statePath, true); + public void unlock() { if (nodePath != null) { zkUtils.getZkClient().delete(nodePath); @@ -114,4 +105,29 @@ public class ZkDistributedLock implements DistributedLockWithState { LOG.warn("Ephemeral lock node you want to delete doesn't exist"); } } + + /** + * Listener for changes in children of LOCK + * children are the ephemeral nodes created to acquire the lock + */ + class ParticipantChangeHandler extends ZkUtils.GenerationAwareZkChildListener { + + public ParticipantChangeHandler(ZkUtils zkUtils) { + super(zkUtils, "ParticipantChangeHandler"); + } + + // Called when the children of the given path changed. + @Override + public void doHandleChildChange(String parentPath, List<String> currentChildren) + throws Exception { + synchronized (mutex) { + if (currentChildren == null) { + LOG.warn("handleChildChange on path " + parentPath + " was invoked with NULL list of children"); + } else { + LOG.info("ParticipantChangeHandler::handleChildChange - Path: {} Current Children: {} ", parentPath, currentChildren); + mutex.notify(); + } + } + } + } } \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java index a9c979d..5aaa261 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java @@ -18,7 +18,9 @@ */ package org.apache.samza.zk; +import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; +import org.apache.samza.coordinator.CoordinationConstants; import org.apache.samza.metadatastore.MetadataStore; import org.apache.samza.metadatastore.MetadataStoreFactory; import org.apache.samza.metrics.MetricsRegistry; @@ -31,6 +33,8 @@ public class ZkMetadataStoreFactory implements MetadataStoreFactory { @Override public MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry) { - return new ZkMetadataStore(namespace, config, metricsRegistry); + String globalAppId = new ApplicationConfig(config).getGlobalAppId(); + String metadataStoreBaseDir = "/" + globalAppId + "/" + CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX + "/" + namespace; + return new ZkMetadataStore(metadataStoreBaseDir, config, metricsRegistry); } } diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java new file mode 100644 index 0000000..39bc583 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.coordinator; + +import org.apache.samza.metadatastore.MetadataStore; +import org.junit.Test; +import org.mockito.Mockito; + + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.*; + +public class TestRunIdGenerator { + + private static final String FAKE_RUNID = "FAKE_RUNID"; + private RunIdGenerator runIdGenerator; + private CoordinationUtils coordinationUtils; + private DistributedLock distributedLock; + private ClusterMembership membership; + private MetadataStore metadataStore; + + @Test + public void testSingleProcessorWriteRunId() throws Exception { + // When there is a single processor registered with ClusterMembership + // RunIdGenerator should write a new run id to the MetadataStore + + prepareRunIdGenerator(1); + + runIdGenerator.getRunId(); + + verify(coordinationUtils, Mockito.times(1)).getClusterMembership(); + verify(coordinationUtils, Mockito.times(1)).getLock(anyString()); + verify(distributedLock, Mockito.times(1)).lock(anyObject()); + verify(distributedLock, Mockito.times(1)).unlock(); + verify(membership, Mockito.times(1)).registerProcessor(); + verify(membership, Mockito.times(1)).getNumberOfProcessors(); + verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class)); + } + + @Test + public void testTwoProcessorsReadRunId() throws Exception { + // When there are two processors registered with ClusterMembership + // RunIdGenerator should read run id from the MetadataStore + + prepareRunIdGenerator(2); + + String runId = runIdGenerator.getRunId().get(); + + assertEquals("Runid was not read from store", runId, FAKE_RUNID); + + verify(coordinationUtils, Mockito.times(1)).getClusterMembership(); + verify(coordinationUtils, Mockito.times(1)).getLock(anyString()); + verify(distributedLock, Mockito.times(1)).lock(anyObject()); + verify(distributedLock, Mockito.times(1)).unlock(); + verify(membership, Mockito.times(1)).registerProcessor(); + verify(membership, Mockito.times(1)).getNumberOfProcessors(); + verify(metadataStore, Mockito.times(1)).get(CoordinationConstants.RUNID_STORE_KEY); + } + + private void prepareRunIdGenerator(int numberOfProcessors) throws Exception { + + coordinationUtils = mock(CoordinationUtils.class); + + distributedLock = mock(DistributedLock.class); + when(distributedLock.lock(anyObject())).thenReturn(true); + when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock); + + membership = mock(ClusterMembership.class); + when(membership.getNumberOfProcessors()).thenReturn(numberOfProcessors); + when(coordinationUtils.getClusterMembership()).thenReturn(membership); + + metadataStore = mock(MetadataStore.class); + when(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY)).thenReturn(FAKE_RUNID.getBytes("UTF-8")); + + runIdGenerator = spy(new RunIdGenerator(coordinationUtils, metadataStore)); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java index 5a9b634..7e2ca08 100644 --- a/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java +++ b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java @@ -30,8 +30,10 @@ import org.apache.samza.config.JobConfig; import org.apache.samza.config.JobCoordinatorConfig; import org.apache.samza.coordinator.CoordinationUtils; import org.apache.samza.coordinator.CoordinationUtilsFactory; -import org.apache.samza.coordinator.DistributedLockWithState; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.system.StreamSpec; +import org.apache.samza.zk.ZkMetadataStore; +import org.apache.samza.zk.ZkMetadataStoreFactory; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; @@ -42,7 +44,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; @@ -58,7 +59,7 @@ import static org.mockito.Mockito.when; * TODO: consolidate this with unit tests for ExecutionPlanner after SAMZA-1811 */ @RunWith(PowerMockRunner.class) -@PrepareForTest({LocalJobPlanner.class, JobCoordinatorConfig.class}) +@PrepareForTest({LocalJobPlanner.class, JobCoordinatorConfig.class, ZkMetadataStoreFactory.class}) public class TestLocalJobPlanner { private static final String PLAN_JSON = @@ -73,7 +74,9 @@ public class TestLocalJobPlanner { @Test public void testStreamCreation() throws Exception { - localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class)); + StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class); + doReturn(mock(Config.class)).when(appDesc).getConfig(); + localPlanner = createLocalJobPlanner(appDesc); StreamManager streamManager = mock(StreamManager.class); doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class)); @@ -102,7 +105,9 @@ public class TestLocalJobPlanner { @Test public void testStreamCreationWithCoordination() throws Exception { - localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class)); + StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class); + doReturn(mock(Config.class)).when(appDesc).getConfig(); + localPlanner = createLocalJobPlanner(appDesc); StreamManager streamManager = mock(StreamManager.class); doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class)); @@ -118,9 +123,9 @@ public class TestLocalJobPlanner { when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory); PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig); - DistributedLockWithState lock = mock(DistributedLockWithState.class); - when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true); - when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock); + DistributedLock lock = mock(DistributedLock.class); + when(lock.lock(anyObject())).thenReturn(true); + when(coordinationUtils.getLock(anyString())).thenReturn(lock); when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject())) .thenReturn(coordinationUtils); @@ -191,8 +196,17 @@ public class TestLocalJobPlanner { planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs))); } - private LocalJobPlanner createLocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { - return spy(new LocalJobPlanner(appDesc)); + private LocalJobPlanner createLocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) throws Exception { + CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); + DistributedLock distributedLock = mock(DistributedLock.class); + when(distributedLock.lock(anyObject())).thenReturn(true); + when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock); + + ZkMetadataStore zkMetadataStore = mock(ZkMetadataStore.class); + when(zkMetadataStore.get(any())).thenReturn(null); + PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(zkMetadataStore); + + return spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID")); } private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) { diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java index 5e91a2a..3ccf587 100644 --- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java +++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java @@ -37,42 +37,54 @@ import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MapConfig; import org.apache.samza.context.ExternalContext; +import org.apache.samza.coordinator.ClusterMembership; +import org.apache.samza.coordinator.CoordinationConstants; +import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.coordinator.DistributedLock; import org.apache.samza.job.ApplicationStatus; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.execution.LocalJobPlanner; import org.apache.samza.task.IdentityStreamTask; +import org.apache.samza.zk.ZkMetadataStore; +import org.apache.samza.zk.ZkMetadataStoreFactory; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.*; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.*; +@RunWith(PowerMockRunner.class) +@PrepareForTest({LocalJobPlanner.class, LocalApplicationRunner.class, ZkMetadataStoreFactory.class}) public class TestLocalApplicationRunner { private Config config; private SamzaApplication mockApp; private LocalApplicationRunner runner; private LocalJobPlanner localPlanner; + private CoordinationUtils coordinationUtils; + private ZkMetadataStore metadataStore; + private ClusterMembership clusterMembership; @Before - public void setUp() { + public void setUp() throws Exception { config = new MapConfig(); mockApp = mock(StreamApplication.class); prepareTest(); } @Test - public void testRunStreamTask() { + public void testRunStreamTask() throws Exception { final Map<String, String> cfgs = new HashMap<>(); cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); cfgs.put(ApplicationConfig.APP_NAME, "test-app"); @@ -105,7 +117,7 @@ public class TestLocalApplicationRunner { } @Test - public void testRunStreamTaskWithoutExternalContext() { + public void testRunStreamTaskWithoutExternalContext() throws Exception { final Map<String, String> cfgs = new HashMap<>(); cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); cfgs.put(ApplicationConfig.APP_NAME, "test-app"); @@ -136,7 +148,7 @@ public class TestLocalApplicationRunner { } @Test - public void testRunComplete() { + public void testRunComplete() throws Exception { Map<String, String> cfgs = new HashMap<>(); cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); config = new MapConfig(cfgs); @@ -172,7 +184,7 @@ public class TestLocalApplicationRunner { } @Test - public void testRunFailure() { + public void testRunFailure() throws Exception { Map<String, String> cfgs = new HashMap<>(); cfgs.put(ApplicationConfig.PROCESSOR_ID, "0"); config = new MapConfig(cfgs); @@ -247,11 +259,97 @@ public class TestLocalApplicationRunner { LocalApplicationRunner.createProcessorId(mockConfig); } - private void prepareTest() { + private void prepareTest() throws Exception { + CoordinationUtils coordinationUtils = mock(CoordinationUtils.class); + + DistributedLock distributedLock = mock(DistributedLock.class); + when(distributedLock.lock(anyObject())).thenReturn(true); + when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock); + + ZkMetadataStore zkMetadataStore = mock(ZkMetadataStore.class); + when(zkMetadataStore.get(any())).thenReturn(null); + PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(zkMetadataStore); + + ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = + ApplicationDescriptorUtil.getAppDescriptor(mockApp, config); + localPlanner = spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID")); + runner = spy(new LocalApplicationRunner(appDesc, Optional.of(coordinationUtils))); + doReturn(localPlanner).when(runner).getPlanner(); + } + + /** + * For app.mode=BATCH ensure that the run.id generation utils -- + * DistributedLock, ClusterMembership and MetadataStore are created. + * Also ensure that metadataStore.put is invoked (to write the run.id) + * @throws Exception + */ + @Test + public void testRunIdForBatch() throws Exception { + final Map<String, String> cfgs = new HashMap<>(); + cfgs.put(ApplicationConfig.APP_MODE, "BATCH"); + cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); + cfgs.put(JobConfig.JOB_NAME(), "test-task-job"); + cfgs.put(JobConfig.JOB_ID(), "jobId"); + config = new MapConfig(cfgs); + mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName()); + + prepareTestForRunId(); + runner.run(); + + verify(coordinationUtils, Mockito.times(1)).getLock(CoordinationConstants.RUNID_LOCK_ID); + verify(clusterMembership, Mockito.times(1)).getNumberOfProcessors(); + verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class)); + } + + /** + * For app.mode=STREAM ensure that the run.id generation utils -- + * DistributedLock, ClusterMembership and MetadataStore are NOT created. + * Also ensure that metadataStore.put is NOT invoked + * @throws Exception + */ + @Test + public void testRunIdForStream() throws Exception { + final Map<String, String> cfgs = new HashMap<>(); + cfgs.put(ApplicationConfig.APP_MODE, "STREAM"); + cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName()); + cfgs.put(JobConfig.JOB_NAME(), "test-task-job"); + cfgs.put(JobConfig.JOB_ID(), "jobId"); + config = new MapConfig(cfgs); + mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName()); + + prepareTestForRunId(); + + runner.run(); + + + verify(coordinationUtils, Mockito.times(0)).getLock(CoordinationConstants.RUNID_LOCK_ID); + verify(coordinationUtils, Mockito.times(0)).getClusterMembership(); + verify(clusterMembership, Mockito.times(0)).getNumberOfProcessors(); + verify(metadataStore, Mockito.times(0)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class)); + } + + private void prepareTestForRunId() throws Exception { + coordinationUtils = mock(CoordinationUtils.class); + + DistributedLock lock = mock(DistributedLock.class); + when(lock.lock(anyObject())).thenReturn(true); + when(coordinationUtils.getLock(anyString())).thenReturn(lock); + + clusterMembership = mock(ClusterMembership.class); + when(clusterMembership.getNumberOfProcessors()).thenReturn(1); + when(coordinationUtils.getClusterMembership()).thenReturn(clusterMembership); + + metadataStore = mock(ZkMetadataStore.class); + when(metadataStore.get(any())).thenReturn(null); + PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(metadataStore); + ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc = ApplicationDescriptorUtil.getAppDescriptor(mockApp, config); - localPlanner = spy(new LocalJobPlanner(appDesc)); - runner = spy(new LocalApplicationRunner(appDesc, localPlanner)); + runner = spy(new LocalApplicationRunner(appDesc, Optional.of(coordinationUtils))); + localPlanner = spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID")); + doReturn(localPlanner).when(runner).getPlanner(); + StreamProcessor sp = mock(StreamProcessor.class); + doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), anyObject(), anyObject()); } } diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java new file mode 100644 index 0000000..1775db5 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import org.I0Itec.zkclient.ZkClient; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import org.junit.Test; + +import static junit.framework.Assert.*; + +public class TestZkClusterMembership { + private static EmbeddedZookeeper zkServer = null; + private static String testZkConnectionString = null; + private ZkUtils zkUtils1; + private ZkUtils zkUtils2; + + @BeforeClass + public static void test() { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); + testZkConnectionString = String.format("127.0.0.1:%d", zkServer.getPort()); + } + + @Before + public void testSetup() { + ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); + this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); + ZkClient zkClient2 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); + this.zkUtils2 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient2, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); + } + + @After + public void testTearDown() { + zkUtils1.close(); + zkUtils2.close(); + } + + @AfterClass + public static void teardown() { + zkServer.teardown(); + } + + @Test + public void testMembershipSingleProcessor() { + // happy path for single processor + ZkClusterMembership clusterMembership = new ZkClusterMembership("p1", zkUtils1); + String processorId; + + assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership.getNumberOfProcessors()); + + processorId = clusterMembership.registerProcessor(); + + assertEquals("ClusterMembership does not have participants after a processor registered.", 1, clusterMembership.getNumberOfProcessors()); + + clusterMembership.unregisterProcessor(processorId); + + assertEquals("ClusterMembership has participants after the single processor unregistered.", 0, clusterMembership.getNumberOfProcessors()); + } + + @Test + public void testMembershipTwoProcessors() { + // Two processors register. Check if second processor registering gets 2 as number of processors. + ZkClusterMembership clusterMembership1 = new ZkClusterMembership("p1", zkUtils1); + ZkClusterMembership clusterMembership2 = new ZkClusterMembership("p2", zkUtils1); + + String processorId1; + String processorId2; + + assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership1.getNumberOfProcessors()); + + processorId1 = clusterMembership1.registerProcessor(); + + assertEquals("ClusterMembership does not have participants after one processor registered.", 1, clusterMembership1.getNumberOfProcessors()); + + processorId2 = clusterMembership2.registerProcessor(); + + assertEquals("ClusterMembership does not have 2 participants after two processor registered.", 2, clusterMembership2.getNumberOfProcessors()); + + clusterMembership1.unregisterProcessor(processorId1); + clusterMembership2.unregisterProcessor(processorId2); + + assertEquals("ClusterMembership has participants after both processors unregistered.", 0, clusterMembership1.getNumberOfProcessors()); + } + + @Test + public void testMembershipFirstProcessorUnregister() { + // First processor unregisters. Check if second processor registering gets 1 as number of processors. + ZkClusterMembership clusterMembership1 = new ZkClusterMembership("p1", zkUtils1); + ZkClusterMembership clusterMembership2 = new ZkClusterMembership("p2", zkUtils1); + + String processorId1; + String processorId2; + + assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership1.getNumberOfProcessors()); + + processorId1 = clusterMembership1.registerProcessor(); + + assertEquals("ClusterMembership does not have participants after one processor registered.", 1, clusterMembership1.getNumberOfProcessors()); + + clusterMembership1.unregisterProcessor(processorId1); + + processorId2 = clusterMembership2.registerProcessor(); + + assertEquals("ClusterMembership does not have 1 participant1 after second processor registered.", 1, clusterMembership2.getNumberOfProcessors()); + + clusterMembership2.unregisterProcessor(processorId2); + + assertEquals("ClusterMembership has participants after both processors unregistered.", 0, clusterMembership2.getNumberOfProcessors()); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java new file mode 100644 index 0000000..b5d85aa --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.time.Duration; +import org.I0Itec.zkclient.ZkClient; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.apache.samza.util.NoOpMetricsRegistry; + +import java.util.List; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static junit.framework.Assert.*; + +public class TestZkDistributedLock { + private static EmbeddedZookeeper zkServer = null; + private static String testZkConnectionString = null; + private ZkUtils zkUtils1; + private ZkUtils zkUtils2; + + @BeforeClass + public static void test() { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); + testZkConnectionString = String.format("127.0.0.1:%d", zkServer.getPort()); + } + + @Before + public void testSetup() { + ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); + this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); + ZkClient zkClient2 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); + this.zkUtils2 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient2, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); + } + + @After + public void testTearDown() { + zkUtils1.close(); + zkUtils2.close(); + } + + @AfterClass + public static void teardown() { + zkServer.teardown(); + } + + private List<String> getParticipants(ZkUtils zkUtils, String lockId) { + String lockPath = String.format("%s/lock_%s", zkUtils1.getKeyBuilder().getRootPath(), lockId); + return zkUtils.getZkClient().getChildren(lockPath); + } + + @Test + public void testLockSingleProcessor() { + String lockId = "FAKE_LOCK_ID_1"; + ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId); + + + assertEquals("Lock has participants before any processor tried to lock.", 0, getParticipants(zkUtils1, lockId).size()); + + boolean lock1Status = lock1.lock(Duration.ofMillis(10000)); + assertEquals("Lock does not have 1 participant after first processor tries to lock.", 1, getParticipants(zkUtils1, lockId).size()); + assertEquals("1st processor requesting to lock did not acquire the lock.", true, lock1Status); + lock1.unlock(); + assertEquals("Lock does have 1 participant after first processor tries to unlock.", 0, getParticipants(zkUtils1, lockId).size()); + } + + @Test + public void testLockTwoProcessors() { + // second processor should acquire lock after first one unlocks + String lockId = "FAKE_LOCK_ID_2"; + ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId); + ZkDistributedLock lock2 = new ZkDistributedLock("p2", zkUtils2, lockId); + + assertEquals("Lock has participants before any processor tried to lock.", 0, getParticipants(zkUtils1, lockId).size()); + + boolean lock1Status = lock1.lock(Duration.ofMillis(10000)); + assertEquals("First processor requesting to lock did not acquire the lock.", true, lock1Status); + lock1.unlock(); + boolean lock2Status = lock2.lock(Duration.ofMillis(10000)); + assertEquals("Second processor requesting to lock did not acquire the lock.", true, lock2Status); + lock2.unlock(); + assertEquals("Lock does have participants after processors unlocked.", 0, getParticipants(zkUtils1, lockId).size()); + } + + @Test + public void testLockFirstProcessorClosing() { + // first processor dies before unlock then second processor should acquire + String lockId = "FAKE_LOCK_ID_3"; + ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId); + ZkDistributedLock lock2 = new ZkDistributedLock("p2", zkUtils2, lockId); + + + assertEquals("Lock has participants before any processor tried to lock!", 0, getParticipants(zkUtils1, lockId).size()); + + boolean lock1Status = lock1.lock(Duration.ofMillis(10000)); + assertEquals("First processor requesting to lock did not acquire the lock.", true, lock1Status); + // first processor dies before unlock + zkUtils1.close(); + + boolean lock2Status = lock2.lock(Duration.ofMillis(10000)); + assertEquals("Second processor requesting to lock did not acquire the lock.", true, lock2Status); + lock2.unlock(); + assertEquals("Lock does have participants after processors unlocked.", 0, getParticipants(zkUtils2, lockId).size()); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java index 3d5f3b3..4d53222 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java @@ -57,7 +57,7 @@ public class TestZkMetadataStore { public void beforeTest() { String testZkConnectionString = String.format("%s:%s", LOCALHOST, zkServer.getPort()); Config zkConfig = new MapConfig(ImmutableMap.of(ZkConfig.ZK_CONNECT, testZkConnectionString)); - zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("/%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap()); + zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap()); } @After diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index e7c3ef8..e40bbc3 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -65,6 +65,7 @@ import org.apache.samza.metadatastore.MetadataStoreFactory; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.runtime.ApplicationRunners; +import org.apache.samza.runtime.LocalApplicationRunner; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.test.StandaloneTestUtils; import org.apache.samza.test.harness.IntegrationTestHarness; @@ -189,40 +190,48 @@ public class TestZkLocalApplicationRunner extends IntegrationTestHarness { } } - private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId) { + private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId, boolean isBatch) { List<String> inputSystemStreams = inputTopics.stream() .map(topic -> String.format("%s.%s", TestZkLocalApplicationRunner.TEST_SYSTEM, topic)) .collect(Collectors.toList()); String coordinatorSystemName = "coordinatorSystem"; - Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder() - .put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS) - .put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputSystemStreams)) - .put(JobConfig.JOB_DEFAULT_SYSTEM(), TestZkLocalApplicationRunner.TEST_SYSTEM) - .put(TaskConfig.IGNORED_EXCEPTIONS(), "*") - .put(ZkConfig.ZK_CONNECT, zkConnect()) - .put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY) - .put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY) - .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY) - .put(ApplicationConfig.APP_NAME, appName) - .put(ApplicationConfig.APP_ID, appId) - .put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner") - .put(String.format("systems.%s.samza.factory", TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY) - .put(JobConfig.JOB_NAME(), appName) - .put(JobConfig.JOB_ID(), appId) - .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS) - .put(TaskConfig.DROP_PRODUCER_ERRORS(), "true") - .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS) - .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000") - .put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true") - .put("job.coordinator.system", coordinatorSystemName) - .put("job.coordinator.replication.factor", "1") - .build(); + Map<String, String> config = new HashMap<>(); + config.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS); + config.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputSystemStreams)); + config.put(JobConfig.JOB_DEFAULT_SYSTEM(), TestZkLocalApplicationRunner.TEST_SYSTEM); + config.put(TaskConfig.IGNORED_EXCEPTIONS(), "*"); + config.put(ZkConfig.ZK_CONNECT, zkConnect()); + config.put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY); + config.put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY); + config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY); + config.put(ApplicationConfig.APP_NAME, appName); + config.put(ApplicationConfig.APP_ID, appId); + config.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner"); + config.put(String.format("systems.%s.samza.factory", TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY); + config.put(JobConfig.JOB_NAME(), appName); + config.put(JobConfig.JOB_ID(), appId); + config.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS); + config.put(TaskConfig.DROP_PRODUCER_ERRORS(), "true"); + config.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS); + config.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000"); + config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true"); + config.put("job.coordinator.system", coordinatorSystemName); + config.put("job.coordinator.replication.factor", "1"); + if (isBatch) { + config.put(ApplicationConfig.APP_MODE, "BATCH"); + config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "false"); + } + Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder().putAll(config).build(); Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig); applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(coordinatorSystemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true)); applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(TestZkLocalApplicationRunner.TEST_SYSTEM, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true)); return applicationConfig; } + private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId) { + return buildStreamApplicationConfigMap(inputTopics, appName, appId, false); + } + /** * sspGrouper is set to GroupBySystemStreamPartitionFactory. * Run a stream application(appRunner1) consuming messages from input topic(effectively one container). @@ -981,6 +990,270 @@ public class TestZkLocalApplicationRunner extends IntegrationTestHarness { return taskAssignments; } + /** + * Test if two processors coming up at the same time agree on a single runid + * 1. bring up two processors + * 2. wait till they start consuimg messages + * 3. check if first processor run.id matches that of second processor + */ + @Test + public void testAgreeingOnSameRunIdForBatch() throws InterruptedException { + publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + + Map<String, String> configMap = + buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); + applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); + applicationConfig2 = new ApplicationConfig(new MapConfig(configMap)); + + + // Create StreamApplication from configuration. + CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); + CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); + + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null, + applicationConfig1), applicationConfig1); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null, + applicationConfig2), applicationConfig2); + + executeRun(appRunner1, applicationConfig1); + executeRun(appRunner2, applicationConfig2); + + processedMessagesLatch1.await(); + processedMessagesLatch2.await(); + + // At this stage, both the processors are running. + // check if their runId matches + + LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1; + LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2; + + assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId()); + + appRunner1.kill(); + appRunner1.waitForFinish(); + appRunner2.kill(); + appRunner2.waitForFinish(); + } + + + /** + * Test if a new processors joining an existing qurorum get the same runid + * 1. bring up two processors + * 2. wait till they start consuming messages + * 3. bring up a third processor + * 4. wait till third processor starts consuming messsages + * 5. check if third processor run.id matches that of first twp + */ + @Test + public void testNewProcessorGetsSameRunIdForBatch() throws InterruptedException { + publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + + Map<String, String> configMap = + buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); + applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); + applicationConfig2 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); + applicationConfig3 = new ApplicationConfig(new MapConfig(configMap)); + + // Create StreamApplication from configuration. + CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); + CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); + + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null, + applicationConfig1), applicationConfig1); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null, + applicationConfig2), applicationConfig2); + + executeRun(appRunner1, applicationConfig1); + executeRun(appRunner2, applicationConfig2); + + processedMessagesLatch1.await(); + processedMessagesLatch2.await(); + + // At this stage, both the processors are running. + // check if their runId matches + + LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1; + LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2; + + assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId()); + + //Bring up a new processsor + CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); + ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null, + applicationConfig3), applicationConfig3); + executeRun(appRunner3, applicationConfig3); + processedMessagesLatch3.await(); + + // At this stage, the new processor is running. + // check if new processor's runId matches that of the older processors + LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3; + assertEquals("RunId of the new processor does not match that of old processor", localApplicationRunner3.getRunId(), localApplicationRunner1.getRunId()); + + + appRunner1.kill(); + appRunner1.waitForFinish(); + appRunner2.kill(); + appRunner2.waitForFinish(); + appRunner3.kill(); + appRunner3.waitForFinish(); + } + + + /** + * Test one group of processors dying and a new processor coming up generates new run.id + * 1. bring up two processors + * 2. wait till they start consuimg messages + * 3. kill and shutdown neatly both the processors + * 4. bring up a new processor + * 5. wait till new processor starts consuming messages + * 6. check if new processor has new runid different from shutdown processors + */ + @Test + public void testAllProcesssorDieNewProcessorGetsNewRunIdForBatch() throws InterruptedException { + publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + + Map<String, String> configMap = + buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); + applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); + applicationConfig2 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); + applicationConfig3 = new ApplicationConfig(new MapConfig(configMap)); + + // Create StreamApplication from configuration. + CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); + CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); + + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null, + applicationConfig1), applicationConfig1); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null, + applicationConfig2), applicationConfig2); + + executeRun(appRunner1, applicationConfig1); + executeRun(appRunner2, applicationConfig2); + + processedMessagesLatch1.await(); + processedMessagesLatch2.await(); + + // At this stage, both the processors are running. + // check if their runId matches + + LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1; + LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2; + + assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId()); + + String oldRunId = localApplicationRunner1.getRunId().get(); + + // shut down both the processors + appRunner1.kill(); + appRunner1.waitForFinish(); + appRunner2.kill(); + appRunner2.waitForFinish(); + + //Bring up a new processsor + CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); + ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null, + applicationConfig3), applicationConfig3); + executeRun(appRunner3, applicationConfig3); + processedMessagesLatch3.await(); + + // At this stage, the new processor is running. + // check if new processor's runId matches that of the older processors + LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3; + + assertNotEquals("RunId of the new processor same as that of old stopped processors", oldRunId, localApplicationRunner3.getRunId()); + + appRunner3.kill(); + appRunner3.waitForFinish(); + } + + + /** + * Test if first processor dying changes the runid for new processors joining + * 1. bring up two processors + * 2. wait till they start consuimg messages + * 3. kill and shutdown first processor + * 4. bring up a new processor + * 5. wait till new processor starts consuming messages + * 6. check if new processor gets same run.id + */ + @Test + public void testFirstProcessorDiesButSameRunIdForBatch() throws InterruptedException { + publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]); + + Map<String, String> configMap = + buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]); + applicationConfig1 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]); + applicationConfig2 = new ApplicationConfig(new MapConfig(configMap)); + configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]); + applicationConfig3 = new ApplicationConfig(new MapConfig(configMap)); + + CountDownLatch processedMessagesLatch1 = new CountDownLatch(1); + ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null, + applicationConfig1), applicationConfig1); + executeRun(appRunner1, applicationConfig1); + + // firt processor is up and running + processedMessagesLatch1.await(); + + LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1; + + // bring up second processor + CountDownLatch processedMessagesLatch2 = new CountDownLatch(1); + ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null, + applicationConfig2), applicationConfig2); + executeRun(appRunner2, applicationConfig2); + + // second processor is up and running + processedMessagesLatch2.await(); + LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2; + + assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId()); + + // shut down first processor + appRunner1.kill(); + appRunner1.waitForFinish(); + + //Bring up a new processsor + CountDownLatch processedMessagesLatch3 = new CountDownLatch(1); + ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance( + TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null, + applicationConfig3), applicationConfig3); + executeRun(appRunner3, applicationConfig3); + processedMessagesLatch3.await(); + + // At this stage, the new processor is running. + // check if new processor runid matches the old ones + LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3; + assertEquals("RunId of the new processor is not the same as that of earlier processors", localApplicationRunner2.getRunId(), localApplicationRunner3.getRunId()); + + + appRunner2.kill(); + appRunner2.waitForFinish(); + appRunner3.kill(); + appRunner3.waitForFinish(); + } + + private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) { System.out.println(jobModel); Set<SystemStreamPartition> ssps = new HashSet<>();