Repository: incubator-gobblin Updated Branches: refs/heads/master ed91dcdae -> ccd7ba769
[GOBBLIN-456] add option to delete state store add option to delete state store Closes #2327 from arjun4084346/addDeleteStateStoreOption Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ccd7ba76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ccd7ba76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ccd7ba76 Branch: refs/heads/master Commit: ccd7ba769308e720db33ea800d964df43df4e878 Parents: ed91dcd Author: Arjun <ab...@linkedin.com> Authored: Mon Apr 9 11:54:31 2018 -0700 Committer: Hung Tran <hut...@linkedin.com> Committed: Mon Apr 9 11:54:58 2018 -0700 ---------------------------------------------------------------------- .../gobblin/runtime/api/SpecExecutor.java | 3 +- .../gobblin/runtime/api/SpecProducer.java | 7 ++- .../orchestration/AzkabanSpecProducer.java | 3 +- .../orchestration/AzkabanProjectConfigTest.java | 11 ++-- .../service/SimpleKafkaSpecProducer.java | 6 +- .../service/StreamingKafkaSpecConsumer.java | 4 +- .../gobblin/service/FlowConfigClient.java | 18 ++++++ .../gobblin/service/FlowConfigsResource.java | 19 +++++- .../org/apache/gobblin/runtime/api/JobSpec.java | 31 +++++++++- .../gobblin/runtime/api/MutableSpecCatalog.java | 7 +-- .../apache/gobblin/runtime/api/SpecCatalog.java | 6 +- .../runtime/api/SpecCatalogListener.java | 9 ++- .../job_monitor/AvroJobSpecKafkaJobMonitor.java | 62 ++++++++++++++++---- .../runtime/job_monitor/KafkaJobMonitor.java | 24 ++------ .../runtime/job_spec/ResolvedJobSpec.java | 2 +- .../runtime/spec_catalog/FlowCatalog.java | 10 +++- .../spec_catalog/SpecCatalogListenersList.java | 5 +- .../runtime/spec_catalog/TopologyCatalog.java | 9 ++- .../InMemorySpecProducer.java | 3 +- .../job_monitor/KafkaJobMonitorTest.java | 18 ------ .../job_monitor/MockedKafkaJobMonitor.java | 1 + .../modules/flow/BaseFlowToJobSpecCompiler.java | 8 ++- .../modules/orchestration/Orchestrator.java | 13 ++-- .../scheduler/GobblinServiceJobScheduler.java | 8 ++- 24 files changed, 198 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java index cb5197a..85ee7af 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java @@ -58,7 +58,8 @@ public interface SpecExecutor { public static enum Verb { ADD(1, "add"), UPDATE(2, "update"), - DELETE(3, "delete"); + DELETE(3, "delete"), + UNKNOWN(4, "unknown"); private int _id; private String _verb; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java index 9b9e504..880847d 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java @@ -19,6 +19,7 @@ package org.apache.gobblin.runtime.api; import java.net.URI; import java.util.List; +import java.util.Properties; import java.util.concurrent.Future; import org.apache.gobblin.annotation.Alpha; @@ -38,8 +39,12 @@ public interface SpecProducer<V> { /** Update a {@link Spec} being executed on {@link SpecExecutor}. */ Future<?> updateSpec(V updatedSpec); + default Future<?> deleteSpec(URI deletedSpecURI) { + return deleteSpec(deletedSpecURI, new Properties()); + } + /** Delete a {@link Spec} being executed on {@link SpecExecutor}. */ - Future<?> deleteSpec(URI deletedSpecURI); + Future<?> deleteSpec(URI deletedSpecURI, Properties headers); /** List all {@link Spec} being executed on {@link SpecExecutor}. */ Future<? extends List<V>> listSpecs(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java index 7b11cef..a1ae133 100644 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.util.List; +import java.util.Properties; import java.util.concurrent.Future; import org.apache.commons.codec.EncoderException; @@ -139,7 +140,7 @@ public class AzkabanSpecProducer implements SpecProducer<Spec>, Closeable { } @Override - public Future<?> deleteSpec(URI deletedSpecURI) { + public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) { // Delete project JobSpec jobSpec = new JobSpec.Builder(deletedSpecURI).build(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java index 9e189ab..3a48806 100644 --- a/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java +++ b/gobblin-modules/gobblin-azkaban/src/test/java/org/apache/gobblin/service/modules/orchestration/AzkabanProjectConfigTest.java @@ -17,6 +17,7 @@ package org.apache.gobblin.service.modules.orchestration; import java.net.URI; +import java.util.Collections; import java.util.Properties; import lombok.extern.slf4j.Slf4j; @@ -39,7 +40,7 @@ public class AzkabanProjectConfigTest { Properties properties = new Properties(); JobSpec jobSpec = new JobSpec(new URI("uri"), "0.0", "test job spec", - ConfigUtils.propertiesToConfig(properties), properties, Optional.absent()); + ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP); AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec); String actualProjectName = azkabanProjectConfig.getAzkabanProjectName(); @@ -54,7 +55,7 @@ public class AzkabanProjectConfigTest { Properties properties = new Properties(); properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix"); JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec", - ConfigUtils.propertiesToConfig(properties), properties, Optional.absent()); + ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP); AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec); String actualProjectName = azkabanProjectConfig.getAzkabanProjectName(); @@ -69,7 +70,7 @@ public class AzkabanProjectConfigTest { Properties properties = new Properties(); properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName"); JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"), - "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent()); + "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP); AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec); String actualProjectName = azkabanProjectConfig.getAzkabanProjectName(); @@ -84,7 +85,7 @@ public class AzkabanProjectConfigTest { Properties properties = new Properties(); properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefix"); JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context"), "0.0", "test job spec", - ConfigUtils.propertiesToConfig(properties), properties, Optional.absent()); + ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP); AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec); String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename(); @@ -99,7 +100,7 @@ public class AzkabanProjectConfigTest { Properties properties = new Properties(); properties.setProperty("gobblin.service.azkaban.project.namePrefix", "randomPrefixWithReallyLongName"); JobSpec jobSpec = new JobSpec(new URI("http://localhost:8000/context/that-keeps-expanding-and-explanding"), - "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent()); + "0.0", "test job spec", ConfigUtils.propertiesToConfig(properties), properties, Optional.absent(), Collections.EMPTY_MAP); AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig(jobSpec); String actualZipFileName = azkabanProjectConfig.getAzkabanProjectZipFilename(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java index a5163db..c56593c 100644 --- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecProducer.java @@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.util.List; import java.util.concurrent.Future; +import java.util.Properties; import org.apache.commons.lang3.reflect.ConstructorUtils; import org.slf4j.Logger; @@ -101,10 +102,11 @@ public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable { } @Override - public Future<?> deleteSpec(URI deletedSpecURI) { + public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) { AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString()) - .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name())).build(); + .setMetadata(ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.DELETE.name())) + .setProperties(Maps.fromProperties(headers)).build(); log.info("Deleting Spec: " + deletedSpecURI + " using Kafka."); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java index 6d8de39..ef44c7d 100644 --- a/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java +++ b/gobblin-modules/gobblin-service-kafka/src/main/java/org/apache/gobblin/service/StreamingKafkaSpecConsumer.java @@ -224,8 +224,8 @@ public class StreamingKafkaSpecConsumer extends AbstractIdleService implements S } private long getRemovedSpecs() { - return StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs() != null? - StreamingKafkaSpecConsumer.this._jobMonitor.getRemmovedSpecs().getCount() : 0; + return StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs() != null? + StreamingKafkaSpecConsumer.this._jobMonitor.getRemovedSpecs().getCount() : 0; } private long getMessageParseFailures() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java index 28255bb..a1c983e 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java @@ -52,6 +52,7 @@ public class FlowConfigClient implements Closeable { private Optional<HttpClientFactory> _httpClientFactory; private Optional<RestClient> _restClient; private final FlowconfigsRequestBuilders _flowconfigsRequestBuilders; + public static final String DELETE_STATE_STORE_KEY = "delete.state.store"; /** * Construct a {@link FlowConfigClient} to communicate with http flow config server at URI serverUri @@ -156,6 +157,23 @@ public class FlowConfigClient implements Closeable { response.getResponse(); } + /** + * Delete a flow configuration + * @param flowId identifier of flow configuration to delete + * @throws RemoteInvocationException + */ + public void deleteFlowConfigWithStateStore(FlowId flowId) + throws RemoteInvocationException { + LOG.debug("deleteFlowConfig and state store with groupName " + flowId.getFlowGroup() + " flowName " + + flowId.getFlowName()); + + DeleteRequest<FlowConfig> deleteRequest = _flowconfigsRequestBuilders.delete() + .id(new ComplexResourceKey<>(flowId, new EmptyRecord())).setHeader(DELETE_STATE_STORE_KEY, Boolean.TRUE.toString()).build(); + ResponseFuture<EmptyRecord> response = _restClient.get().sendRequest(deleteRequest); + + response.getResponse(); + } + @Override public void close() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java index f0bce17..9074a43 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java @@ -19,7 +19,9 @@ package org.apache.gobblin.service; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -46,12 +48,17 @@ import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; +import com.google.common.collect.ImmutableSet; + + /** * Resource for handling flow configuration requests */ @RestLiCollection(name = "flowconfigs", namespace = "org.apache.gobblin.service", keyName = "id") public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, EmptyRecord, FlowConfig> { private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsResource.class); + private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store"); + @edu.umd.cs.findbugs.annotations.SuppressWarnings("MS_SHOULD_BE_FINAL") public static FlowCatalog _globalFlowCatalog; @@ -234,7 +241,7 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), "/" + flowGroup + "/" + flowName, null, null); - getFlowCatalog().remove(flowUri); + getFlowCatalog().remove(flowUri, getHeaders()); return new UpdateResponse(HttpStatus.S_200_OK); } catch (URISyntaxException e) { @@ -244,6 +251,16 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt return null; } + private Properties getHeaders() { + Properties headerProperties = new Properties(); + for (Map.Entry<String, String> entry : getContext().getRequestHeaders().entrySet()) { + if (ALLOWED_METADATA.contains(entry.getKey())) { + headerProperties.put(entry.getKey(), entry.getValue()); + } + } + return headerProperties; + } + /*** * This method is to workaround injection issues where Service has only one active global FlowCatalog * .. and is not able to inject it via RestLI bootstrap. We should remove this and make injected http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java index 0ae943e..203ea8d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/JobSpec.java @@ -19,11 +19,13 @@ package org.apache.gobblin.runtime.api; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.util.Map; import java.util.Properties; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -34,8 +36,10 @@ import org.apache.gobblin.util.ConfigUtils; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.extern.slf4j.Slf4j; +@Slf4j /** * Defines a Gobblin Job that can be run once, or multiple times. A {@link JobSpec} is * {@link Configurable} so it has an associated {@link Config}, along with other mandatory @@ -64,6 +68,12 @@ public class JobSpec implements Configurable, Spec { /** URI of {@link org.apache.gobblin.runtime.api.JobTemplate} to use. */ Optional<URI> templateURI; + /** Metadata can contain properties which are not a part of config, e.g. Verb */ + Map<String, String> metadata; + + /** A Verb identifies if the Spec is for Insert/Update/Delete */ + public static final String VERB_KEY = "Verb"; + public static Builder builder(URI jobSpecUri) { return new Builder(jobSpecUri); } @@ -131,6 +141,7 @@ public class JobSpec implements Configurable, Spec { private Optional<String> description = Optional.absent(); private Optional<URI> jobCatalogURI = Optional.absent(); private Optional<URI> templateURI = Optional.absent(); + private Optional<Map> metadata = Optional.absent(); public Builder(URI jobSpecUri) { Preconditions.checkNotNull(jobSpecUri); @@ -156,7 +167,7 @@ public class JobSpec implements Configurable, Spec { Preconditions.checkNotNull(this.uri); Preconditions.checkNotNull(this.version); return new JobSpec(getURI(), getVersion(), getDescription(), getConfig(), - getConfigAsProperties(), getTemplateURI()); + getConfigAsProperties(), getTemplateURI(), getMetadata()); } /** The scheme and authority of the job catalog URI are used to generate JobSpec URIs from @@ -289,6 +300,24 @@ public class JobSpec implements Configurable, Spec { this.templateURI = Optional.of(templateURI); return this; } + + public Map getDefaultMetadata() { + log.warn("Job Spec Verb is not provided, using type 'UNKNOWN'."); + return ImmutableMap.of(VERB_KEY, SpecExecutor.Verb.UNKNOWN.name()); + } + + public Map getMetadata() { + if (!this.metadata.isPresent()) { + this.metadata = Optional.of(getDefaultMetadata()); + } + return this.metadata.get(); + } + + public Builder withMetadata(Map<String, String> metadata) { + Preconditions.checkNotNull(metadata); + this.metadata = Optional.of(metadata); + return this; + } } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java index 108a324..7a3e946 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java @@ -18,15 +18,12 @@ package org.apache.gobblin.runtime.api; import java.net.URI; -import java.util.ArrayList; import java.util.Collection; +import java.util.Properties; import java.util.concurrent.TimeUnit; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.ContextAwareTimer; -import org.apache.gobblin.util.ConfigUtils; import com.google.common.base.Optional; import com.typesafe.config.Config; @@ -51,7 +48,7 @@ public interface MutableSpecCatalog extends SpecCatalog { * Removes an existing {@link Spec} with the given URI. * Throws SpecNotFoundException if such {@link Spec} does not exist. */ - void remove(URI uri) throws SpecNotFoundException; + void remove(URI uri, Properties headers) throws SpecNotFoundException; @Slf4j public static class MutableStandardMetrics extends StandardMetrics { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java index 457be9a..024c20c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java @@ -19,13 +19,12 @@ package org.apache.gobblin.runtime.api; import java.net.URI; import java.util.Collection; -import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -33,7 +32,6 @@ import org.apache.gobblin.instrumented.GobblinMetricsKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareGauge; -import org.apache.gobblin.metrics.ContextAwareMetric; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; @@ -141,7 +139,7 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, StandardMetr } @Override - public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) { this.totalDeletedSpecs.incrementAndGet(); submitTrackingEvent(deletedSpecURI, deletedSpecVersion, SPEC_DELETED_OPERATION_TYPE); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java index 2b0aa40..1448231 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java @@ -18,6 +18,7 @@ package org.apache.gobblin.runtime.api; import java.net.URI; +import java.util.Properties; import com.google.common.base.Objects; @@ -31,7 +32,7 @@ public interface SpecCatalogListener { /** * Invoked when a {@link Spec} gets removed from the catalog. */ - public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion); + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers); /** * Invoked when the contents of a {@link Spec} gets updated in the catalog. @@ -56,18 +57,20 @@ public interface SpecCatalogListener { public static class DeleteSpecCallback extends Callback<SpecCatalogListener, Void> { private final URI _deletedSpecURI; private final String _deletedSpecVersion; + private final Properties _headers; - public DeleteSpecCallback(URI deletedSpecURI, String deletedSpecVersion) { + public DeleteSpecCallback(URI deletedSpecURI, String deletedSpecVersion, Properties headers) { super(Objects.toStringHelper("onDeleteSpec") .add("deletedSpecURI", deletedSpecURI) .add("deletedSpecVersion", deletedSpecVersion) .toString()); _deletedSpecURI = deletedSpecURI; _deletedSpecVersion = deletedSpecVersion; + _headers = headers; } @Override public Void apply(SpecCatalogListener listener) { - listener.onDeleteSpec(_deletedSpecURI, _deletedSpecVersion); + listener.onDeleteSpec(_deletedSpecURI, _deletedSpecVersion, _headers); return null; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java index 59733d3..e035326 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/AvroJobSpecKafkaJobMonitor.java @@ -23,6 +23,7 @@ import java.net.URISyntaxException; import java.util.Collection; import java.util.Properties; +import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -36,10 +37,12 @@ import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobSpecMonitor; import org.apache.gobblin.runtime.api.JobSpecMonitorFactory; import org.apache.gobblin.runtime.api.MutableJobCatalog; -import org.apache.gobblin.runtime.api.SpecExecutor.Verb; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.runtime.job_spec.AvroJobSpec; import org.apache.gobblin.util.Either; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + +import kafka.message.MessageAndMetadata; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -54,7 +57,7 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec> public static final String CONFIG_PREFIX = "gobblin.jobMonitor.avroJobSpec"; public static final String TOPIC_KEY = "topic"; public static final String SCHEMA_VERSION_READER_CLASS = "versionReaderClass"; - protected static final String VERB_KEY = "Verb"; + public static final String DELETE_STATE_STORE_KEY = "delete.state.store"; private static final Config DEFAULTS = ConfigFactory.parseMap(ImmutableMap.of( SCHEMA_VERSION_READER_CLASS, FixedSchemaVersionWriter.class.getName())); @@ -103,9 +106,9 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec> } /** - * Creates a {@link JobSpec} or {@link URI} from the {@link AvroJobSpec} record. + * Creates {@link JobSpec} from the {@link AvroJobSpec} record. * @param record the record as an {@link AvroJobSpec} - * @return a {@link JobSpec} or {@link URI} wrapped in a {@link Collection} of {@link Either} + * @return a {@link JobSpec} wrapped in a {@link Collection} of {@link Either} */ @Override public Collection<Either<JobSpec, URI>> parseJobSpec(AvroJobSpec record) { @@ -114,7 +117,7 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec> Properties props = new Properties(); props.putAll(record.getProperties()); jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion()) - .withDescription(record.getDescription()).withConfigAsProperties(props); + .withDescription(record.getDescription()).withConfigAsProperties(props).withMetadata(record.getMetadata()); if (!record.getTemplateUri().isEmpty()) { try { @@ -124,17 +127,52 @@ public class AvroJobSpecKafkaJobMonitor extends KafkaAvroJobMonitor<AvroJobSpec> } } - String verbName = record.getMetadata().get(VERB_KEY); - Verb verb = Verb.valueOf(verbName); - JobSpec jobSpec = jobSpecBuilder.build(); log.info("Parsed job spec " + jobSpec.toString()); - if (verb == Verb.ADD || verb == Verb.UPDATE) { - return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec)); - } else { - return Lists.newArrayList(Either.<JobSpec, URI>right(jobSpec.getUri())); + return Lists.newArrayList(Either.<JobSpec, URI>left(jobSpec)); + } + + @Override + protected void processMessage(MessageAndMetadata<byte[], byte[]> message) { + try { + Collection<Either<JobSpec, URI>> parsedCollection = parseJobSpec(message.message()); + for (Either<JobSpec, URI> parsedMessage : parsedCollection) { + JobSpec jobSpec = ((Either.Left<JobSpec, URI>)parsedMessage).getLeft(); + if (jobSpec.getMetadata().get(JobSpec.VERB_KEY).equalsIgnoreCase(SpecExecutor.Verb.DELETE.name())) { + this.removedSpecs.inc(); + URI jobSpecUri = jobSpec.getUri(); + this.jobCatalog.remove(jobSpecUri); + + // Refer FlowConfigsResources:delete to understand the pattern of flow URI + // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI + if (jobSpec.getConfig().hasPath(DELETE_STATE_STORE_KEY) && + Boolean.parseBoolean(jobSpec.getConfig().getString(DELETE_STATE_STORE_KEY))) { + // Delete the job state if it is a delete spec request + String[] uriTokens = jobSpecUri.getPath().split("/"); + if (null == this.datasetStateStore) { + log.warn("Job state store deletion failed as datasetstore is not initialized."); + continue; + } + if (uriTokens.length != 3) { + log.error("Invalid URI {}.", jobSpecUri); + continue; + } + String jobName = uriTokens[2]; + this.datasetStateStore.delete(jobName); + log.info("JobSpec {} deleted with statestore.", jobSpecUri); + } else { + log.info("JobSpec {} deleted keeping statestore.", jobSpecUri); + } + } else { + this.newSpecs.inc(); + this.jobCatalog.put(jobSpec); + } + } + } catch (IOException ioe) { + String messageStr = new String(message.message(), Charsets.UTF_8); + log.error(String.format("Failed to delete job/jobStateStore or parse kafka message with offset %d: %s.", message.offset(), messageStr), ioe); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java index 6902eae..9d79fc0 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java @@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.typesafe.config.Config; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.JobSpecMonitor; @@ -52,13 +51,13 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]> public static final String KAFKA_AUTO_OFFSET_RESET_KEY = KAFKA_JOB_MONITOR_PREFIX + ".auto.offset.reset"; public static final String KAFKA_AUTO_OFFSET_RESET_SMALLEST = "smallest"; public static final String KAFKA_AUTO_OFFSET_RESET_LARGEST = "largest"; - private DatasetStateStore datasetStateStore; - private final MutableJobCatalog jobCatalog; + protected DatasetStateStore datasetStateStore; + protected final MutableJobCatalog jobCatalog; @Getter - private Counter newSpecs; + protected Counter newSpecs; @Getter - private Counter remmovedSpecs; + protected Counter removedSpecs; /** * @return A collection of either {@link JobSpec}s to add/update or {@link URI}s to remove from the catalog, @@ -81,7 +80,7 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]> protected void createMetrics() { super.createMetrics(); this.newSpecs = this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_NEW_SPECS); - this.remmovedSpecs = this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_REMOVED_SPECS); + this.removedSpecs = this.getMetricContext().counter(RuntimeMetrics.GOBBLIN_JOB_MONITOR_KAFKA_REMOVED_SPECS); } @VisibleForTesting @@ -106,19 +105,8 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]> this.newSpecs.inc(); this.jobCatalog.put(((Either.Left<JobSpec, URI>) parsedMessage).getLeft()); } else if (parsedMessage instanceof Either.Right) { - this.remmovedSpecs.inc(); + this.removedSpecs.inc(); this.jobCatalog.remove(((Either.Right<JobSpec, URI>) parsedMessage).getRight()); - - // Refer FlowConfigsResources:delete to understand the pattern of flow URI - // FlowToJobSpec Compilers use the flowSpecURI to derive jobSpecURI - String[] uriTokens = ((URI)(((Either.Right) parsedMessage).getRight())).getPath().split("/"); - if (uriTokens.length == 3) { - String jobName = uriTokens[2]; - // Delete the job state if it is a delete spec request - if (this.datasetStateStore != null) { - this.datasetStateStore.delete(jobName); - } - } } } } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java index 8847467..b64f178 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_spec/ResolvedJobSpec.java @@ -59,7 +59,7 @@ public class ResolvedJobSpec extends JobSpec { public ResolvedJobSpec(JobSpec other, JobCatalog catalog) throws SpecNotFoundException, JobTemplate.TemplateException { super(other.getUri(), other.getVersion(), other.getDescription(), resolveConfig(other, catalog), - ConfigUtils.configToProperties(resolveConfig(other, catalog)), other.getTemplateURI()); + ConfigUtils.configToProperties(resolveConfig(other, catalog)), other.getTemplateURI(), other.getMetadata()); this.originalJobSpec = other; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index f78be47..f9ae420 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -23,6 +23,8 @@ import java.net.URI; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Properties; + import javax.annotation.Nonnull; import org.apache.commons.lang3.SerializationUtils; @@ -255,8 +257,12 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut } } - @Override public void remove(URI uri) { + remove(uri, new Properties()); + } + + @Override + public void remove(URI uri, Properties headers) { try { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(uri); @@ -264,7 +270,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut log.info(String.format("Removing FlowSpec with URI: %s", uri)); specStore.deleteSpec(uri); this.metrics.updateRemoveSpecTime(startTime); - this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION); + this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, headers); } catch (IOException e) { throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java index cdb9379..f2cd04b 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/SpecCatalogListenersList.java @@ -56,6 +56,7 @@ import java.io.IOException; import java.net.URI; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.Properties; import org.slf4j.Logger; @@ -109,11 +110,11 @@ public class SpecCatalogListenersList implements SpecCatalogListener, SpecCatalo } @Override - public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) { Preconditions.checkNotNull(deletedSpecURI); try { - _disp.execCallbacks(new SpecCatalogListener.DeleteSpecCallback(deletedSpecURI, deletedSpecVersion)); + _disp.execCallbacks(new SpecCatalogListener.DeleteSpecCallback(deletedSpecURI, deletedSpecVersion, headers)); } catch (InterruptedException e) { getLog().warn("onDeleteSpec interrupted."); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java index 5c25a67..a842abd 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import javax.annotation.Nonnull; @@ -241,14 +242,18 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, } } - @Override public void remove(URI uri) { + remove(uri, new Properties()); + } + + @Override + public void remove(URI uri, Properties headers) { try { Preconditions.checkState(state() == Service.State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(uri); log.info(String.format("Removing TopologySpec with URI: %s", uri)); - this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION); + this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION, headers); specStore.deleteSpec(uri); } catch (IOException e) { throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java index 80f64ec..cc74757 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java @@ -21,6 +21,7 @@ import java.io.Serializable; import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.Future; import com.google.common.collect.Lists; @@ -65,7 +66,7 @@ public class InMemorySpecProducer implements SpecProducer<Spec>, Serializable { } @Override - public Future<?> deleteSpec(URI deletedSpecURI) { + public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) { if (!provisionedSpecs.containsKey(deletedSpecURI)) { throw new RuntimeException("Spec not found: " + deletedSpecURI); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java index 57f99a9..c825a12 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitorTest.java @@ -19,16 +19,12 @@ package org.apache.gobblin.runtime.job_monitor; import java.net.URI; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; import com.google.common.base.Optional; import com.typesafe.config.Config; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.kafka.HighLevelConsumerTest; @@ -38,8 +34,6 @@ public class KafkaJobMonitorTest { public void test() throws Exception { Config config = HighLevelConsumerTest.getSimpleConfig(Optional.of(KafkaJobMonitor.KAFKA_JOB_MONITOR_PREFIX)); - String stateStoreRootDir = config.getString(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY); - FileSystem fs = FileSystem.getLocal(new Configuration()); MockedKafkaJobMonitor monitor = MockedKafkaJobMonitor.create(config); monitor.startAsync(); @@ -66,18 +60,6 @@ public class KafkaJobMonitorTest { Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("job2"))); Assert.assertEquals(monitor.getJobSpecs().get(new URI("job2")).getVersion(), "2"); - monitor.getMockKafkaStream().pushToStream("/flow3/job3:1"); - monitor.awaitExactlyNSpecs(3); - Assert.assertTrue(monitor.getJobSpecs().containsKey(new URI("/flow3/job3"))); - - // TODO: Currently, state stores are not categorized by flow name. - // This can lead to one job overwriting other jobs' job state. - fs.create(new Path(stateStoreRootDir, "job3")); - Assert.assertTrue(fs.exists(new Path(stateStoreRootDir, "job3"))); - monitor.getMockKafkaStream().pushToStream(MockedKafkaJobMonitor.REMOVE + ":/flow3/job3"); - monitor.awaitExactlyNSpecs(2); - Assert.assertFalse(fs.exists(new Path(stateStoreRootDir, "job3"))); - monitor.shutDown(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java index 7d3ef37..9e55236 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/job_monitor/MockedKafkaJobMonitor.java @@ -98,6 +98,7 @@ class MockedKafkaJobMonitor extends KafkaJobMonitor { return jobCatalog; } + @Override public Collection<Either<JobSpec, URI>> parseJobSpec(byte[] message) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java index 855d692..e4c9ae3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -22,6 +22,8 @@ import java.net.URI; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; + import javax.annotation.Nonnull; import com.codahale.metrics.Meter; @@ -158,8 +160,12 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec); } + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties()); + } + @Override - public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) { if (topologySpecMap.containsKey(deletedSpecURI)) { topologySpecMap.remove(deletedSpecURI); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index e2d36aa..1b3907d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -22,6 +22,7 @@ import java.net.URI; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -144,13 +145,17 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { } } + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties()); + } + /** {@inheritDoc} */ @Override - public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) { _log.info("Spec deletion detected: " + deletedSpecURI + "/" + deletedSpecVersion); if (topologyCatalog.isPresent()) { - this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion); + this.specCompiler.onDeleteSpec(deletedSpecURI, deletedSpecVersion, headers); } } @@ -211,7 +216,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } - public void remove(Spec spec) { + public void remove(Spec spec, Properties headers) { // TODO: Evolve logic to cache and reuse previously compiled JobSpecs // .. this will work for Identity compiler but not always for multi-hop. // Note: Current logic assumes compilation is consistent between all executions @@ -232,7 +237,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { Spec jobSpec = specsToDelete.getKey(); _log.info(String.format("Going to delete JobSpec: %s on Executor: %s", jobSpec, producer)); - producer.deleteSpec(jobSpec.getUri()); + producer.deleteSpec(jobSpec.getUri(), headers); } catch(Exception e) { _log.error("Cannot successfully delete spec: " + specsToDelete.getKey() + " on executor: " + producer + " for flow: " + spec, e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ccd7ba76/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java index ae18fc2..328c742 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java @@ -237,9 +237,13 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata } } + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties()); + } + /** {@inheritDoc} */ @Override - public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) { if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) { // Specs in store will be notified when Scheduler is added as listener to FlowCatalog, so ignore // .. Specs if in cluster mode and Helix is not yet initialized @@ -259,7 +263,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata try { Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString()); if (null != deletedSpec) { - this.orchestrator.remove(deletedSpec); + this.orchestrator.remove(deletedSpec, headers); this.scheduledFlowSpecs.remove(deletedSpecURI.toString()); unscheduleJob(deletedSpecURI.toString()); } else {