http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java deleted file mode 100644 index e961066..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/testing/TestDataflowPipelineRunner.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.testing; - -import com.google.api.services.dataflow.model.JobMessage; -import com.google.api.services.dataflow.model.JobMetrics; -import com.google.api.services.dataflow.model.MetricUpdate; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.PipelineResult.State; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DataflowJobExecutionException; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineJob; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; -import com.google.cloud.dataflow.sdk.runners.PipelineRunner; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.util.MonitoringUtil; -import com.google.cloud.dataflow.sdk.util.MonitoringUtil.JobMessagesHandler; -import com.google.cloud.dataflow.sdk.values.PInput; -import com.google.cloud.dataflow.sdk.values.POutput; -import com.google.common.base.Optional; -import com.google.common.base.Throwables; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.math.BigDecimal; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -/** - * {@link TestDataflowPipelineRunner} is a pipeline runner that wraps a - * {@link DataflowPipelineRunner} when running tests against the {@link TestPipeline}. - * - * @see TestPipeline - */ -public class TestDataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob> { - private static final String TENTATIVE_COUNTER = "tentative"; - private static final Logger LOG = LoggerFactory.getLogger(TestDataflowPipelineRunner.class); - - private final TestDataflowPipelineOptions options; - private final DataflowPipelineRunner runner; - private int expectedNumberOfAssertions = 0; - - TestDataflowPipelineRunner(TestDataflowPipelineOptions options) { - this.options = options; - this.runner = DataflowPipelineRunner.fromOptions(options); - } - - /** - * Constructs a runner from the provided options. - */ - public static TestDataflowPipelineRunner fromOptions( - PipelineOptions options) { - TestDataflowPipelineOptions dataflowOptions = options.as(TestDataflowPipelineOptions.class); - - return new TestDataflowPipelineRunner(dataflowOptions); - } - - @Override - public DataflowPipelineJob run(Pipeline pipeline) { - return run(pipeline, runner); - } - - DataflowPipelineJob run(Pipeline pipeline, DataflowPipelineRunner runner) { - - final DataflowPipelineJob job; - try { - job = runner.run(pipeline); - } catch (DataflowJobExecutionException ex) { - throw new IllegalStateException("The dataflow failed."); - } - - LOG.info("Running Dataflow job {} with {} expected assertions.", - job.getJobId(), expectedNumberOfAssertions); - - CancelWorkflowOnError messageHandler = new CancelWorkflowOnError( - job, new MonitoringUtil.PrintHandler(options.getJobMessageOutput())); - - try { - final Optional<Boolean> result; - - if (options.isStreaming()) { - Future<Optional<Boolean>> resultFuture = options.getExecutorService().submit( - new Callable<Optional<Boolean>>() { - @Override - public Optional<Boolean> call() throws Exception { - try { - for (;;) { - Optional<Boolean> result = checkForSuccess(job); - if (result.isPresent()) { - return result; - } - Thread.sleep(10000L); - } - } finally { - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - job.cancel(); - } - } - }); - State finalState = job.waitToFinish(10L, TimeUnit.MINUTES, messageHandler); - if (finalState == null || finalState == State.RUNNING) { - LOG.info("Dataflow job {} took longer than 10 minutes to complete, cancelling.", - job.getJobId()); - job.cancel(); - } - result = resultFuture.get(); - } else { - job.waitToFinish(-1, TimeUnit.SECONDS, messageHandler); - result = checkForSuccess(job); - } - if (!result.isPresent()) { - throw new IllegalStateException( - "The dataflow did not output a success or failure metric."); - } else if (!result.get()) { - throw new AssertionError(messageHandler.getErrorMessage() == null ? - "The dataflow did not return a failure reason." - : messageHandler.getErrorMessage()); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - Throwables.propagateIfPossible(e.getCause()); - throw new RuntimeException(e.getCause()); - } catch (IOException e) { - throw new RuntimeException(e); - } - return job; - } - - @Override - public <OutputT extends POutput, InputT extends PInput> OutputT apply( - PTransform<InputT, OutputT> transform, InputT input) { - if (transform instanceof PAssert.OneSideInputAssert - || transform instanceof PAssert.TwoSideInputAssert) { - expectedNumberOfAssertions += 1; - } - - return runner.apply(transform, input); - } - - Optional<Boolean> checkForSuccess(DataflowPipelineJob job) - throws IOException { - State state = job.getState(); - if (state == State.FAILED || state == State.CANCELLED) { - LOG.info("The pipeline failed"); - return Optional.of(false); - } - - JobMetrics metrics = job.getDataflowClient().projects().jobs() - .getMetrics(job.getProjectId(), job.getJobId()).execute(); - - if (metrics == null || metrics.getMetrics() == null) { - LOG.warn("Metrics not present for Dataflow job {}.", job.getJobId()); - } else { - int successes = 0; - int failures = 0; - for (MetricUpdate metric : metrics.getMetrics()) { - if (metric.getName() == null || metric.getName().getContext() == null - || !metric.getName().getContext().containsKey(TENTATIVE_COUNTER)) { - // Don't double count using the non-tentative version of the metric. - continue; - } - if (PAssert.SUCCESS_COUNTER.equals(metric.getName().getName())) { - successes += ((BigDecimal) metric.getScalar()).intValue(); - } else if (PAssert.FAILURE_COUNTER.equals(metric.getName().getName())) { - failures += ((BigDecimal) metric.getScalar()).intValue(); - } - } - - if (failures > 0) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, - expectedNumberOfAssertions); - return Optional.of(false); - } else if (successes >= expectedNumberOfAssertions) { - LOG.info("Found result while running Dataflow job {}. Found {} success, {} failures out of " - + "{} expected assertions.", job.getJobId(), successes, failures, - expectedNumberOfAssertions); - return Optional.of(true); - } - - LOG.info("Running Dataflow job {}. Found {} success, {} failures out of {} expected " - + "assertions.", job.getJobId(), successes, failures, expectedNumberOfAssertions); - } - - return Optional.<Boolean>absent(); - } - - @Override - public String toString() { - return "TestDataflowPipelineRunner#" + options.getAppName(); - } - - /** - * Cancels the workflow on the first error message it sees. - * - * <p>Creates an error message representing the concatenation of all error messages seen. - */ - private static class CancelWorkflowOnError implements JobMessagesHandler { - private final DataflowPipelineJob job; - private final JobMessagesHandler messageHandler; - private final StringBuffer errorMessage; - private CancelWorkflowOnError(DataflowPipelineJob job, JobMessagesHandler messageHandler) { - this.job = job; - this.messageHandler = messageHandler; - this.errorMessage = new StringBuffer(); - } - - @Override - public void process(List<JobMessage> messages) { - messageHandler.process(messages); - for (JobMessage message : messages) { - if (message.getMessageImportance() != null - && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) { - LOG.info("Dataflow job {} threw exception. Failure message was: {}", - job.getJobId(), message.getMessageText()); - errorMessage.append(message.getMessageText()); - } - } - if (errorMessage.length() > 0) { - LOG.info("Cancelling Dataflow job {}", job.getJobId()); - try { - job.cancel(); - } catch (Exception ignore) { - // The TestDataflowPipelineRunner will thrown an AssertionError with the job failure - // messages. - } - } - } - - private String getErrorMessage() { - return errorMessage.toString(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java deleted file mode 100644 index aeb864a..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowPathValidator.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; -import com.google.common.base.Preconditions; - -import java.io.IOException; - -/** - * GCP implementation of {@link PathValidator}. Only GCS paths are allowed. - */ -public class DataflowPathValidator implements PathValidator { - - private DataflowPipelineOptions dataflowOptions; - - DataflowPathValidator(DataflowPipelineOptions options) { - this.dataflowOptions = options; - } - - public static DataflowPathValidator fromOptions(PipelineOptions options) { - return new DataflowPathValidator(options.as(DataflowPipelineOptions.class)); - } - - /** - * Validates the the input GCS path is accessible and that the path - * is well formed. - */ - @Override - public String validateInputFilePatternSupported(String filepattern) { - GcsPath gcsPath = getGcsPath(filepattern); - Preconditions.checkArgument( - dataflowOptions.getGcsUtil().isGcsPatternSupported(gcsPath.getObject())); - String returnValue = verifyPath(filepattern); - verifyPathIsAccessible(filepattern, "Could not find file %s"); - return returnValue; - } - - /** - * Validates the the output GCS path is accessible and that the path - * is well formed. - */ - @Override - public String validateOutputFilePrefixSupported(String filePrefix) { - String returnValue = verifyPath(filePrefix); - verifyPathIsAccessible(filePrefix, "Output path does not exist or is not writeable: %s"); - return returnValue; - } - - @Override - public String verifyPath(String path) { - GcsPath gcsPath = getGcsPath(path); - Preconditions.checkArgument(gcsPath.isAbsolute(), - "Must provide absolute paths for Dataflow"); - Preconditions.checkArgument(!gcsPath.getObject().contains("//"), - "Dataflow Service does not allow objects with consecutive slashes"); - return gcsPath.toResourceName(); - } - - private void verifyPathIsAccessible(String path, String errorMessage) { - GcsPath gcsPath = getGcsPath(path); - try { - Preconditions.checkArgument(dataflowOptions.getGcsUtil().bucketExists(gcsPath), - errorMessage, path); - } catch (IOException e) { - throw new RuntimeException( - String.format("Unable to verify that GCS bucket gs://%s exists.", gcsPath.getBucket()), - e); - } - } - - private GcsPath getGcsPath(String path) { - try { - return GcsPath.fromUri(path); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException(String.format( - "%s expected a valid 'gs://' path but was given '%s'", - dataflowOptions.getRunner().getSimpleName(), path), e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java deleted file mode 100644 index 18e6654..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/DataflowTransport.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import static com.google.cloud.dataflow.sdk.util.Transport.getJsonFactory; -import static com.google.cloud.dataflow.sdk.util.Transport.getTransport; - -import com.google.api.client.auth.oauth2.Credential; -import com.google.api.client.http.HttpRequestInitializer; -import com.google.api.services.clouddebugger.v2.Clouddebugger; -import com.google.api.services.dataflow.Dataflow; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; -import com.google.common.collect.ImmutableList; - -import java.net.MalformedURLException; -import java.net.URL; - -/** - * Helpers for cloud communication. - */ -public class DataflowTransport { - - - private static class ApiComponents { - public String rootUrl; - public String servicePath; - - public ApiComponents(String root, String path) { - this.rootUrl = root; - this.servicePath = path; - } - } - - private static ApiComponents apiComponentsFromUrl(String urlString) { - try { - URL url = new URL(urlString); - String rootUrl = url.getProtocol() + "://" + url.getHost() + - (url.getPort() > 0 ? ":" + url.getPort() : ""); - return new ApiComponents(rootUrl, url.getPath()); - } catch (MalformedURLException e) { - throw new RuntimeException("Invalid URL: " + urlString); - } - } - - /** - * Returns a Google Cloud Dataflow client builder. - */ - public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options) { - String servicePath = options.getDataflowEndpoint(); - ApiComponents components; - if (servicePath.contains("://")) { - components = apiComponentsFromUrl(servicePath); - } else { - components = new ApiComponents(options.getApiRootUrl(), servicePath); - } - - return new Dataflow.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer( - options.getGcpCredential(), - // Do not log 404. It clutters the output and is possibly even required by the caller. - new RetryHttpRequestInitializer(ImmutableList.of(404)))) - .setApplicationName(options.getAppName()) - .setRootUrl(components.rootUrl) - .setServicePath(components.servicePath) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) { - return new Clouddebugger.Builder(getTransport(), - getJsonFactory(), - chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer())) - .setApplicationName(options.getAppName()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - /** - * Returns a Dataflow client that does not automatically retry failed - * requests. - */ - public static Dataflow.Builder - newRawDataflowClient(DataflowPipelineOptions options) { - return newDataflowClient(options) - .setHttpRequestInitializer(options.getGcpCredential()) - .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); - } - - private static HttpRequestInitializer chainHttpRequestInitializer( - Credential credential, HttpRequestInitializer httpRequestInitializer) { - if (credential == null) { - return httpRequestInitializer; - } else { - return new ChainingHttpRequestInitializer(credential, httpRequestInitializer); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java deleted file mode 100644 index 7307e83..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/GcsStager.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineDebugOptions; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.common.base.Preconditions; - -import java.util.List; - -/** - * Utility class for staging files to GCS. - */ -public class GcsStager implements Stager { - private DataflowPipelineOptions options; - - private GcsStager(DataflowPipelineOptions options) { - this.options = options; - } - - public static GcsStager fromOptions(PipelineOptions options) { - return new GcsStager(options.as(DataflowPipelineOptions.class)); - } - - @Override - public List<DataflowPackage> stageFiles() { - Preconditions.checkNotNull(options.getStagingLocation()); - List<String> filesToStage = options.getFilesToStage(); - String windmillBinary = - options.as(DataflowPipelineDebugOptions.class).getOverrideWindmillBinary(); - if (windmillBinary != null) { - filesToStage.add("windmill_main=" + windmillBinary); - } - return PackageUtil.stageClasspathElements( - options.getFilesToStage(), options.getStagingLocation()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java deleted file mode 100644 index 2c06a92..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/MonitoringUtil.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import static com.google.cloud.dataflow.sdk.util.TimeUtil.fromCloudTime; - -import com.google.api.services.dataflow.Dataflow; -import com.google.api.services.dataflow.Dataflow.Projects.Jobs.Messages; -import com.google.api.services.dataflow.model.JobMessage; -import com.google.api.services.dataflow.model.ListJobMessagesResponse; -import com.google.cloud.dataflow.sdk.PipelineResult.State; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; -import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableMap; - -import org.joda.time.Instant; - -import java.io.IOException; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Map; - -import javax.annotation.Nullable; - -/** - * A helper class for monitoring jobs submitted to the service. - */ -public final class MonitoringUtil { - - private static final String GCLOUD_DATAFLOW_PREFIX = "gcloud alpha dataflow"; - private static final String ENDPOINT_OVERRIDE_ENV_VAR = - "CLOUDSDK_API_ENDPOINT_OVERRIDES_DATAFLOW"; - - private static final Map<String, State> DATAFLOW_STATE_TO_JOB_STATE = - ImmutableMap - .<String, State>builder() - .put("JOB_STATE_UNKNOWN", State.UNKNOWN) - .put("JOB_STATE_STOPPED", State.STOPPED) - .put("JOB_STATE_RUNNING", State.RUNNING) - .put("JOB_STATE_DONE", State.DONE) - .put("JOB_STATE_FAILED", State.FAILED) - .put("JOB_STATE_CANCELLED", State.CANCELLED) - .put("JOB_STATE_UPDATED", State.UPDATED) - .build(); - - private String projectId; - private Messages messagesClient; - - /** - * An interface that can be used for defining callbacks to receive a list - * of JobMessages containing monitoring information. - */ - public interface JobMessagesHandler { - /** Process the rows. */ - void process(List<JobMessage> messages); - } - - /** A handler that prints monitoring messages to a stream. */ - public static class PrintHandler implements JobMessagesHandler { - private PrintStream out; - - /** - * Construct the handler. - * - * @param stream The stream to write the messages to. - */ - public PrintHandler(PrintStream stream) { - out = stream; - } - - @Override - public void process(List<JobMessage> messages) { - for (JobMessage message : messages) { - if (message.getMessageText() == null || message.getMessageText().isEmpty()) { - continue; - } - String importanceString = null; - if (message.getMessageImportance() == null) { - continue; - } else if (message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) { - importanceString = "Error: "; - } else if (message.getMessageImportance().equals("JOB_MESSAGE_WARNING")) { - importanceString = "Warning: "; - } else if (message.getMessageImportance().equals("JOB_MESSAGE_BASIC")) { - importanceString = "Basic: "; - } else if (message.getMessageImportance().equals("JOB_MESSAGE_DETAILED")) { - importanceString = "Detail: "; - } else { - // TODO: Remove filtering here once getJobMessages supports minimum - // importance. - continue; - } - @Nullable Instant time = TimeUtil.fromCloudTime(message.getTime()); - if (time == null) { - out.print("UNKNOWN TIMESTAMP: "); - } else { - out.print(time + ": "); - } - if (importanceString != null) { - out.print(importanceString); - } - out.println(message.getMessageText()); - } - out.flush(); - } - } - - /** Construct a helper for monitoring. */ - public MonitoringUtil(String projectId, Dataflow dataflow) { - this(projectId, dataflow.projects().jobs().messages()); - } - - // @VisibleForTesting - MonitoringUtil(String projectId, Messages messagesClient) { - this.projectId = projectId; - this.messagesClient = messagesClient; - } - - /** - * Comparator for sorting rows in increasing order based on timestamp. - */ - public static class TimeStampComparator implements Comparator<JobMessage> { - @Override - public int compare(JobMessage o1, JobMessage o2) { - @Nullable Instant t1 = fromCloudTime(o1.getTime()); - if (t1 == null) { - return -1; - } - @Nullable Instant t2 = fromCloudTime(o2.getTime()); - if (t2 == null) { - return 1; - } - return t1.compareTo(t2); - } - } - - /** - * Return job messages sorted in ascending order by timestamp. - * @param jobId The id of the job to get the messages for. - * @param startTimestampMs Return only those messages with a - * timestamp greater than this value. - * @return collection of messages - * @throws IOException - */ - public ArrayList<JobMessage> getJobMessages( - String jobId, long startTimestampMs) throws IOException { - // TODO: Allow filtering messages by importance - Instant startTimestamp = new Instant(startTimestampMs); - ArrayList<JobMessage> allMessages = new ArrayList<>(); - String pageToken = null; - while (true) { - Messages.List listRequest = messagesClient.list(projectId, jobId); - if (pageToken != null) { - listRequest.setPageToken(pageToken); - } - ListJobMessagesResponse response = listRequest.execute(); - - if (response == null || response.getJobMessages() == null) { - return allMessages; - } - - for (JobMessage m : response.getJobMessages()) { - @Nullable Instant timestamp = fromCloudTime(m.getTime()); - if (timestamp == null) { - continue; - } - if (timestamp.isAfter(startTimestamp)) { - allMessages.add(m); - } - } - - if (response.getNextPageToken() == null) { - break; - } else { - pageToken = response.getNextPageToken(); - } - } - - Collections.sort(allMessages, new TimeStampComparator()); - return allMessages; - } - - public static String getJobMonitoringPageURL(String projectName, String jobId) { - try { - // Project name is allowed in place of the project id: the user will be redirected to a URL - // that has the project name replaced with project id. - return String.format( - "https://console.developers.google.com/project/%s/dataflow/job/%s", - URLEncoder.encode(projectName, "UTF-8"), - URLEncoder.encode(jobId, "UTF-8")); - } catch (UnsupportedEncodingException e) { - // Should never happen. - throw new AssertionError("UTF-8 encoding is not supported by the environment", e); - } - } - - public static String getGcloudCancelCommand(DataflowPipelineOptions options, String jobId) { - - // If using a different Dataflow API than default, prefix command with an API override. - String dataflowApiOverridePrefix = ""; - String apiUrl = options.getDataflowClient().getBaseUrl(); - if (!apiUrl.equals(Dataflow.DEFAULT_BASE_URL)) { - dataflowApiOverridePrefix = String.format("%s=%s ", ENDPOINT_OVERRIDE_ENV_VAR, apiUrl); - } - - // Assemble cancel command from optional prefix and project/job parameters. - return String.format("%s%s jobs --project=%s cancel %s", - dataflowApiOverridePrefix, GCLOUD_DATAFLOW_PREFIX, options.getProject(), jobId); - } - - public static State toState(String stateName) { - return MoreObjects.firstNonNull(DATAFLOW_STATE_TO_JOB_STATE.get(stateName), - State.UNKNOWN); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java deleted file mode 100644 index 0e234a8..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/PackageUtil.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import com.google.api.client.util.BackOffUtils; -import com.google.api.client.util.Sleeper; -import com.google.api.services.dataflow.model.DataflowPackage; -import com.google.cloud.hadoop.util.ApiErrorExtractor; -import com.google.common.hash.Funnels; -import com.google.common.hash.Hasher; -import com.google.common.hash.Hashing; -import com.google.common.io.CountingOutputStream; -import com.google.common.io.Files; - -import com.fasterxml.jackson.core.Base64Variants; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Objects; - -/** Helper routines for packages. */ -public class PackageUtil { - private static final Logger LOG = LoggerFactory.getLogger(PackageUtil.class); - /** - * A reasonable upper bound on the number of jars required to launch a Dataflow job. - */ - public static final int SANE_CLASSPATH_SIZE = 1000; - /** - * The initial interval to use between package staging attempts. - */ - private static final long INITIAL_BACKOFF_INTERVAL_MS = 5000L; - /** - * The maximum number of attempts when staging a file. - */ - private static final int MAX_ATTEMPTS = 5; - - /** - * Translates exceptions from API calls. - */ - private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor(); - - /** - * Creates a DataflowPackage containing information about how a classpath element should be - * staged, including the staging destination as well as its size and hash. - * - * @param classpathElement The local path for the classpath element. - * @param stagingPath The base location for staged classpath elements. - * @param overridePackageName If non-null, use the given value as the package name - * instead of generating one automatically. - * @return The package. - */ - @Deprecated - public static DataflowPackage createPackage(File classpathElement, - String stagingPath, String overridePackageName) { - return createPackageAttributes(classpathElement, stagingPath, overridePackageName) - .getDataflowPackage(); - } - - /** - * Compute and cache the attributes of a classpath element that we will need to stage it. - * - * @param classpathElement the file or directory to be staged. - * @param stagingPath The base location for staged classpath elements. - * @param overridePackageName If non-null, use the given value as the package name - * instead of generating one automatically. - * @return a {@link PackageAttributes} that containing metadata about the object to be staged. - */ - static PackageAttributes createPackageAttributes(File classpathElement, - String stagingPath, String overridePackageName) { - try { - boolean directory = classpathElement.isDirectory(); - - // Compute size and hash in one pass over file or directory. - Hasher hasher = Hashing.md5().newHasher(); - OutputStream hashStream = Funnels.asOutputStream(hasher); - CountingOutputStream countingOutputStream = new CountingOutputStream(hashStream); - - if (!directory) { - // Files are staged as-is. - Files.asByteSource(classpathElement).copyTo(countingOutputStream); - } else { - // Directories are recursively zipped. - ZipFiles.zipDirectory(classpathElement, countingOutputStream); - } - - long size = countingOutputStream.getCount(); - String hash = Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes()); - - // Create the DataflowPackage with staging name and location. - String uniqueName = getUniqueContentName(classpathElement, hash); - String resourcePath = IOChannelUtils.resolve(stagingPath, uniqueName); - DataflowPackage target = new DataflowPackage(); - target.setName(overridePackageName != null ? overridePackageName : uniqueName); - target.setLocation(resourcePath); - - return new PackageAttributes(size, hash, directory, target); - } catch (IOException e) { - throw new RuntimeException("Package setup failure for " + classpathElement, e); - } - } - - /** - * Transfers the classpath elements to the staging location. - * - * @param classpathElements The elements to stage. - * @param stagingPath The base location to stage the elements to. - * @return A list of cloud workflow packages, each representing a classpath element. - */ - public static List<DataflowPackage> stageClasspathElements( - Collection<String> classpathElements, String stagingPath) { - return stageClasspathElements(classpathElements, stagingPath, Sleeper.DEFAULT); - } - - // Visible for testing. - static List<DataflowPackage> stageClasspathElements( - Collection<String> classpathElements, String stagingPath, - Sleeper retrySleeper) { - LOG.info("Uploading {} files from PipelineOptions.filesToStage to staging location to " - + "prepare for execution.", classpathElements.size()); - - if (classpathElements.size() > SANE_CLASSPATH_SIZE) { - LOG.warn("Your classpath contains {} elements, which Google Cloud Dataflow automatically " - + "copies to all workers. Having this many entries on your classpath may be indicative " - + "of an issue in your pipeline. You may want to consider trimming the classpath to " - + "necessary dependencies only, using --filesToStage pipeline option to override " - + "what files are being staged, or bundling several dependencies into one.", - classpathElements.size()); - } - - ArrayList<DataflowPackage> packages = new ArrayList<>(); - - if (stagingPath == null) { - throw new IllegalArgumentException( - "Can't stage classpath elements on because no staging location has been provided"); - } - - int numUploaded = 0; - int numCached = 0; - for (String classpathElement : classpathElements) { - String packageName = null; - if (classpathElement.contains("=")) { - String[] components = classpathElement.split("=", 2); - packageName = components[0]; - classpathElement = components[1]; - } - - File file = new File(classpathElement); - if (!file.exists()) { - LOG.warn("Skipping non-existent classpath element {} that was specified.", - classpathElement); - continue; - } - - PackageAttributes attributes = createPackageAttributes(file, stagingPath, packageName); - - DataflowPackage workflowPackage = attributes.getDataflowPackage(); - packages.add(workflowPackage); - String target = workflowPackage.getLocation(); - - // TODO: Should we attempt to detect the Mime type rather than - // always using MimeTypes.BINARY? - try { - try { - long remoteLength = IOChannelUtils.getSizeBytes(target); - if (remoteLength == attributes.getSize()) { - LOG.debug("Skipping classpath element already staged: {} at {}", - classpathElement, target); - numCached++; - continue; - } - } catch (FileNotFoundException expected) { - // If the file doesn't exist, it means we need to upload it. - } - - // Upload file, retrying on failure. - AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff( - MAX_ATTEMPTS, - INITIAL_BACKOFF_INTERVAL_MS); - while (true) { - try { - LOG.debug("Uploading classpath element {} to {}", classpathElement, target); - try (WritableByteChannel writer = IOChannelUtils.create(target, MimeTypes.BINARY)) { - copyContent(classpathElement, writer); - } - numUploaded++; - break; - } catch (IOException e) { - if (ERROR_EXTRACTOR.accessDenied(e)) { - String errorMessage = String.format( - "Uploaded failed due to permissions error, will NOT retry staging " - + "of classpath %s. Please verify credentials are valid and that you have " - + "write access to %s. Stale credentials can be resolved by executing " - + "'gcloud auth login'.", classpathElement, target); - LOG.error(errorMessage); - throw new IOException(errorMessage, e); - } else if (!backoff.atMaxAttempts()) { - LOG.warn("Upload attempt failed, sleeping before retrying staging of classpath: {}", - classpathElement, e); - BackOffUtils.next(retrySleeper, backoff); - } else { - // Rethrow last error, to be included as a cause in the catch below. - LOG.error("Upload failed, will NOT retry staging of classpath: {}", - classpathElement, e); - throw e; - } - } - } - } catch (Exception e) { - throw new RuntimeException("Could not stage classpath element: " + classpathElement, e); - } - } - - LOG.info("Uploading PipelineOptions.filesToStage complete: {} files newly uploaded, " - + "{} files cached", - numUploaded, numCached); - - return packages; - } - - /** - * Returns a unique name for a file with a given content hash. - * - * <p>Directory paths are removed. Example: - * <pre> - * dir="a/b/c/d", contentHash="f000" => d-f000.jar - * file="a/b/c/d.txt", contentHash="f000" => d-f000.txt - * file="a/b/c/d", contentHash="f000" => d-f000 - * </pre> - */ - static String getUniqueContentName(File classpathElement, String contentHash) { - String fileName = Files.getNameWithoutExtension(classpathElement.getAbsolutePath()); - String fileExtension = Files.getFileExtension(classpathElement.getAbsolutePath()); - if (classpathElement.isDirectory()) { - return fileName + "-" + contentHash + ".jar"; - } else if (fileExtension.isEmpty()) { - return fileName + "-" + contentHash; - } - return fileName + "-" + contentHash + "." + fileExtension; - } - - /** - * Copies the contents of the classpathElement to the output channel. - * - * <p>If the classpathElement is a directory, a Zip stream is constructed on the fly, - * otherwise the file contents are copied as-is. - * - * <p>The output channel is not closed. - */ - private static void copyContent(String classpathElement, WritableByteChannel outputChannel) - throws IOException { - final File classpathElementFile = new File(classpathElement); - if (classpathElementFile.isDirectory()) { - ZipFiles.zipDirectory(classpathElementFile, Channels.newOutputStream(outputChannel)); - } else { - Files.asByteSource(classpathElementFile).copyTo(Channels.newOutputStream(outputChannel)); - } - } - /** - * Holds the metadata necessary to stage a file or confirm that a staged file has not changed. - */ - static class PackageAttributes { - private final boolean directory; - private final long size; - private final String hash; - private DataflowPackage dataflowPackage; - - public PackageAttributes(long size, String hash, boolean directory, - DataflowPackage dataflowPackage) { - this.size = size; - this.hash = Objects.requireNonNull(hash, "hash"); - this.directory = directory; - this.dataflowPackage = Objects.requireNonNull(dataflowPackage, "dataflowPackage"); - } - - /** - * @return the dataflowPackage - */ - public DataflowPackage getDataflowPackage() { - return dataflowPackage; - } - - /** - * @return the directory - */ - public boolean isDirectory() { - return directory; - } - - /** - * @return the size - */ - public long getSize() { - return size; - } - - /** - * @return the hash - */ - public String getHash() { - return hash; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java b/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java deleted file mode 100644 index f6c6a71..0000000 --- a/runners/google-cloud-dataflow-java/src/main/java/com/google/cloud/dataflow/sdk/util/Stager.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.sdk.util; - -import com.google.api.services.dataflow.model.DataflowPackage; - -import java.util.List; - -/** - * Interface for staging files needed for running a Dataflow pipeline. - */ -public interface Stager { - /* Stage files and return a list of packages. */ - public List<DataflowPackage> stageFiles(); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java new file mode 100644 index 0000000..6bbafdd --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/BlockingDataflowPipelineOptions.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.io.PrintStream; + +/** + * Options that are used to configure the {@link BlockingDataflowPipelineRunner}. + */ +@Description("Configure options on the BlockingDataflowPipelineRunner.") +public interface BlockingDataflowPipelineOptions extends DataflowPipelineOptions { + /** + * Output stream for job status messages. + */ + @Description("Where messages generated during execution of the Dataflow job will be output.") + @JsonIgnore + @Hidden + @Default.InstanceFactory(StandardOutputFactory.class) + PrintStream getJobMessageOutput(); + void setJobMessageOutput(PrintStream value); + + /** + * Returns a default of {@link System#out}. + */ + public static class StandardOutputFactory implements DefaultValueFactory<PrintStream> { + @Override + public PrintStream create(PipelineOptions options) { + return System.out; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java new file mode 100644 index 0000000..3f0503e --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/CloudDebuggerOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import org.apache.beam.sdk.annotations.Experimental; + +import com.google.api.services.clouddebugger.v2.model.Debuggee; + +import javax.annotation.Nullable; + +/** + * Options for controlling Cloud Debugger. + */ +@Description("[Experimental] Used to configure the Cloud Debugger") +@Experimental +@Hidden +public interface CloudDebuggerOptions { + + /** Whether to enable the Cloud Debugger snapshot agent for the current job. */ + @Description("Whether to enable the Cloud Debugger snapshot agent for the current job.") + boolean getEnableCloudDebugger(); + void setEnableCloudDebugger(boolean enabled); + + /** The Cloud Debugger debuggee to associate with. This should not be set directly. */ + @Description("The Cloud Debugger debuggee to associate with. This should not be set directly.") + @Hidden + @Nullable Debuggee getDebuggee(); + void setDebuggee(Debuggee debuggee); + + /** The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. */ + @Description( + "The maximum cost (as a ratio of CPU time) allowed for evaluating conditional snapshots. " + + "Should be a double between 0 and 1. " + + "Snapshots will be cancelled if evaluating conditions takes more than this ratio of time.") + @Default.Double(0.01) + double getMaxConditionCost(); + void setMaxConditionCost(double maxConditionCost); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java new file mode 100644 index 0000000..1be93eb --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineDebugOptions.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import org.apache.beam.sdk.annotations.Experimental; + +import com.google.api.services.dataflow.Dataflow; +import com.google.cloud.dataflow.sdk.util.DataflowPathValidator; +import com.google.cloud.dataflow.sdk.util.DataflowTransport; +import com.google.cloud.dataflow.sdk.util.GcsStager; +import com.google.cloud.dataflow.sdk.util.InstanceBuilder; +import com.google.cloud.dataflow.sdk.util.PathValidator; +import com.google.cloud.dataflow.sdk.util.Stager; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.List; +import java.util.Map; + +/** + * Internal. Options used to control execution of the Dataflow SDK for + * debugging and testing purposes. + */ +@Description("[Internal] Options used to control execution of the Dataflow SDK for " + + "debugging and testing purposes.") +@Hidden +public interface DataflowPipelineDebugOptions extends PipelineOptions { + + /** + * The list of backend experiments to enable. + * + * <p>Dataflow provides a number of experimental features that can be enabled + * with this flag. + * + * <p>Please sync with the Dataflow team before enabling any experiments. + */ + @Description("[Experimental] Dataflow provides a number of experimental features that can " + + "be enabled with this flag. Please sync with the Dataflow team before enabling any " + + "experiments.") + @Experimental + List<String> getExperiments(); + void setExperiments(List<String> value); + + /** + * The root URL for the Dataflow API. {@code dataflowEndpoint} can override this value + * if it contains an absolute URL, otherwise {@code apiRootUrl} will be combined with + * {@code dataflowEndpoint} to generate the full URL to communicate with the Dataflow API. + */ + @Description("The root URL for the Dataflow API. dataflowEndpoint can override this " + + "value if it contains an absolute URL, otherwise apiRootUrl will be combined with " + + "dataflowEndpoint to generate the full URL to communicate with the Dataflow API.") + @Default.String(Dataflow.DEFAULT_ROOT_URL) + String getApiRootUrl(); + void setApiRootUrl(String value); + + /** + * Dataflow endpoint to use. + * + * <p>Defaults to the current version of the Google Cloud Dataflow + * API, at the time the current SDK version was released. + * + * <p>If the string contains "://", then this is treated as a URL, + * otherwise {@link #getApiRootUrl()} is used as the root + * URL. + */ + @Description("The URL for the Dataflow API. If the string contains \"://\", this" + + " will be treated as the entire URL, otherwise will be treated relative to apiRootUrl.") + @Default.String(Dataflow.DEFAULT_SERVICE_PATH) + String getDataflowEndpoint(); + void setDataflowEndpoint(String value); + + /** + * The path to write the translated Dataflow job specification out to + * at job submission time. The Dataflow job specification will be represented in JSON + * format. + */ + @Description("The path to write the translated Dataflow job specification out to " + + "at job submission time. The Dataflow job specification will be represented in JSON " + + "format.") + String getDataflowJobFile(); + void setDataflowJobFile(String value); + + /** + * The class of the validator that should be created and used to validate paths. + * If pathValidator has not been set explicitly, an instance of this class will be + * constructed and used as the path validator. + */ + @Description("The class of the validator that should be created and used to validate paths. " + + "If pathValidator has not been set explicitly, an instance of this class will be " + + "constructed and used as the path validator.") + @Default.Class(DataflowPathValidator.class) + Class<? extends PathValidator> getPathValidatorClass(); + void setPathValidatorClass(Class<? extends PathValidator> validatorClass); + + /** + * The path validator instance that should be used to validate paths. + * If no path validator has been set explicitly, the default is to use the instance factory that + * constructs a path validator based upon the currently set pathValidatorClass. + */ + @JsonIgnore + @Description("The path validator instance that should be used to validate paths. " + + "If no path validator has been set explicitly, the default is to use the instance factory " + + "that constructs a path validator based upon the currently set pathValidatorClass.") + @Default.InstanceFactory(PathValidatorFactory.class) + PathValidator getPathValidator(); + void setPathValidator(PathValidator validator); + + /** + * The class responsible for staging resources to be accessible by workers + * during job execution. If stager has not been set explicitly, an instance of this class + * will be created and used as the resource stager. + */ + @Description("The class of the stager that should be created and used to stage resources. " + + "If stager has not been set explicitly, an instance of the this class will be created " + + "and used as the resource stager.") + @Default.Class(GcsStager.class) + Class<? extends Stager> getStagerClass(); + void setStagerClass(Class<? extends Stager> stagerClass); + + /** + * The resource stager instance that should be used to stage resources. + * If no stager has been set explicitly, the default is to use the instance factory + * that constructs a resource stager based upon the currently set stagerClass. + */ + @JsonIgnore + @Description("The resource stager instance that should be used to stage resources. " + + "If no stager has been set explicitly, the default is to use the instance factory " + + "that constructs a resource stager based upon the currently set stagerClass.") + @Default.InstanceFactory(StagerFactory.class) + Stager getStager(); + void setStager(Stager stager); + + /** + * An instance of the Dataflow client. Defaults to creating a Dataflow client + * using the current set of options. + */ + @JsonIgnore + @Description("An instance of the Dataflow client. Defaults to creating a Dataflow client " + + "using the current set of options.") + @Default.InstanceFactory(DataflowClientFactory.class) + Dataflow getDataflowClient(); + void setDataflowClient(Dataflow value); + + /** Returns the default Dataflow client built from the passed in PipelineOptions. */ + public static class DataflowClientFactory implements DefaultValueFactory<Dataflow> { + @Override + public Dataflow create(PipelineOptions options) { + return DataflowTransport.newDataflowClient( + options.as(DataflowPipelineOptions.class)).build(); + } + } + + /** + * Whether to update the currently running pipeline with the same name as this one. + * + * @deprecated This property is replaced by {@link DataflowPipelineOptions#getUpdate()} + */ + @Deprecated + @Description("If set, replace the existing pipeline with the name specified by --jobName with " + + "this pipeline, preserving state.") + boolean getUpdate(); + @Deprecated + void setUpdate(boolean value); + + /** + * Mapping of old PTranform names to new ones, specified as JSON + * <code>{"oldName":"newName",...}</code>. To mark a transform as deleted, make newName the + * empty string. + */ + @JsonIgnore + @Description( + "Mapping of old PTranform names to new ones, specified as JSON " + + "{\"oldName\":\"newName\",...}. To mark a transform as deleted, make newName the empty " + + "string.") + Map<String, String> getTransformNameMapping(); + void setTransformNameMapping(Map<String, String> value); + + /** + * Custom windmill_main binary to use with the streaming runner. + */ + @Description("Custom windmill_main binary to use with the streaming runner") + String getOverrideWindmillBinary(); + void setOverrideWindmillBinary(String value); + + /** + * Number of threads to use on the Dataflow worker harness. If left unspecified, + * the Dataflow service will compute an appropriate number of threads to use. + */ + @Description("Number of threads to use on the Dataflow worker harness. If left unspecified, " + + "the Dataflow service will compute an appropriate number of threads to use.") + int getNumberOfWorkerHarnessThreads(); + void setNumberOfWorkerHarnessThreads(int value); + + /** + * If {@literal true}, save a heap dump before killing a thread or process which is GC + * thrashing or out of memory. The location of the heap file will either be echoed back + * to the user, or the user will be given the opportunity to download the heap file. + * + * <p> + * CAUTION: Heap dumps can of comparable size to the default boot disk. Consider increasing + * the boot disk size before setting this flag to true. + */ + @Description("If {@literal true}, save a heap dump before killing a thread or process " + + "which is GC thrashing or out of memory.") + boolean getDumpHeapOnOOM(); + void setDumpHeapOnOOM(boolean dumpHeapBeforeExit); + + /** + * Creates a {@link PathValidator} object using the class specified in + * {@link #getPathValidatorClass()}. + */ + public static class PathValidatorFactory implements DefaultValueFactory<PathValidator> { + @Override + public PathValidator create(PipelineOptions options) { + DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); + return InstanceBuilder.ofType(PathValidator.class) + .fromClass(debugOptions.getPathValidatorClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + } + } + + /** + * Creates a {@link Stager} object using the class specified in + * {@link #getStagerClass()}. + */ + public static class StagerFactory implements DefaultValueFactory<Stager> { + @Override + public Stager create(PipelineOptions options) { + DataflowPipelineDebugOptions debugOptions = options.as(DataflowPipelineDebugOptions.class); + return InstanceBuilder.ofType(Stager.class) + .fromClass(debugOptions.getStagerClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java new file mode 100644 index 0000000..dbfafd1 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineOptions.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import com.google.cloud.dataflow.sdk.runners.DataflowPipeline; +import com.google.common.base.MoreObjects; + +import org.joda.time.DateTimeUtils; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +/** + * Options that can be used to configure the {@link DataflowPipeline}. + */ +@Description("Options that configure the Dataflow pipeline.") +public interface DataflowPipelineOptions extends + PipelineOptions, GcpOptions, ApplicationNameOptions, DataflowPipelineDebugOptions, + DataflowPipelineWorkerPoolOptions, BigQueryOptions, + GcsOptions, StreamingOptions, CloudDebuggerOptions, DataflowWorkerLoggingOptions, + DataflowProfilingOptions, PubsubOptions { + + @Description("Project id. Required when running a Dataflow in the cloud. " + + "See https://cloud.google.com/storage/docs/projects for further details.") + @Override + @Validation.Required + @Default.InstanceFactory(DefaultProjectFactory.class) + String getProject(); + @Override + void setProject(String value); + + /** + * GCS path for staging local files, e.g. gs://bucket/object + * + * <p>Must be a valid Cloud Storage URL, beginning with the prefix "gs://" + * + * <p>At least one of {@link PipelineOptions#getTempLocation()} or {@link #getStagingLocation()} + * must be set. If {@link #getStagingLocation()} is not set, then the Dataflow + * pipeline defaults to using {@link PipelineOptions#getTempLocation()}. + */ + @Description("GCS path for staging local files, e.g. \"gs://bucket/object\". " + + "Must be a valid Cloud Storage URL, beginning with the prefix \"gs://\". " + + "At least one of stagingLocation or tempLocation must be set. If stagingLocation is unset, " + + "defaults to using tempLocation.") + String getStagingLocation(); + void setStagingLocation(String value); + + /** + * The Dataflow job name is used as an idempotence key within the Dataflow service. + * If there is an existing job that is currently active, another active job with the same + * name will not be able to be created. Defaults to using the ApplicationName-UserName-Date. + */ + @Description("The Dataflow job name is used as an idempotence key within the Dataflow service. " + + "If there is an existing job that is currently active, another active job with the same " + + "name will not be able to be created. Defaults to using the ApplicationName-UserName-Date.") + @Default.InstanceFactory(JobNameFactory.class) + String getJobName(); + void setJobName(String value); + + /** + * Whether to update the currently running pipeline with the same name as this one. + */ + @Override + @SuppressWarnings("deprecation") // base class member deprecated in favor of this one. + @Description( + "If set, replace the existing pipeline with the name specified by --jobName with " + + "this pipeline, preserving state.") + boolean getUpdate(); + @Override + @SuppressWarnings("deprecation") // base class member deprecated in favor of this one. + void setUpdate(boolean value); + + /** + * Returns a normalized job name constructed from {@link ApplicationNameOptions#getAppName()}, the + * local system user name (if available), and the current time. The normalization makes sure that + * the job name matches the required pattern of [a-z]([-a-z0-9]*[a-z0-9])? and length limit of 40 + * characters. + * + * <p>This job name factory is only able to generate one unique name per second per application + * and user combination. + */ + public static class JobNameFactory implements DefaultValueFactory<String> { + private static final DateTimeFormatter FORMATTER = + DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC); + + @Override + public String create(PipelineOptions options) { + String appName = options.as(ApplicationNameOptions.class).getAppName(); + String normalizedAppName = appName == null || appName.length() == 0 ? "dataflow" + : appName.toLowerCase() + .replaceAll("[^a-z0-9]", "0") + .replaceAll("^[^a-z]", "a"); + String userName = MoreObjects.firstNonNull(System.getProperty("user.name"), ""); + String normalizedUserName = userName.toLowerCase() + .replaceAll("[^a-z0-9]", "0"); + String datePart = FORMATTER.print(DateTimeUtils.currentTimeMillis()); + return normalizedAppName + "-" + normalizedUserName + "-" + datePart; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java new file mode 100644 index 0000000..44a9b00 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowPipelineWorkerPoolOptions.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import org.apache.beam.sdk.annotations.Experimental; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.util.List; + +/** + * Options that are used to configure the Dataflow pipeline worker pool. + */ +@Description("Options that are used to configure the Dataflow pipeline worker pool.") +public interface DataflowPipelineWorkerPoolOptions extends PipelineOptions { + /** + * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling + * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified, + * the Dataflow service will determine the number of workers. + */ + @Description("Number of workers to use when executing the Dataflow job. Note that " + + "selection of an autoscaling algorithm other then \"NONE\" will affect the " + + "size of the worker pool. If left unspecified, the Dataflow service will " + + "determine the number of workers.") + int getNumWorkers(); + void setNumWorkers(int value); + + /** + * Type of autoscaling algorithm to use. + */ + @Experimental(Experimental.Kind.AUTOSCALING) + public enum AutoscalingAlgorithmType { + /** Use numWorkers machines. Do not autoscale the worker pool. */ + NONE("AUTOSCALING_ALGORITHM_NONE"), + + @Deprecated + BASIC("AUTOSCALING_ALGORITHM_BASIC"), + + /** Autoscale the workerpool based on throughput (up to maxNumWorkers). */ + THROUGHPUT_BASED("AUTOSCALING_ALGORITHM_BASIC"); + + private final String algorithm; + + private AutoscalingAlgorithmType(String algorithm) { + this.algorithm = algorithm; + } + + /** Returns the string representation of this type. */ + public String getAlgorithm() { + return this.algorithm; + } + } + + /** + * [Experimental] The autoscaling algorithm to use for the workerpool. + * + * <ul> + * <li>NONE: does not change the size of the worker pool.</li> + * <li>BASIC: autoscale the worker pool size up to maxNumWorkers until the job completes.</li> + * <li>THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers). + * </li> + * </ul> + */ + @Description("[Experimental] The autoscaling algorithm to use for the workerpool. " + + "NONE: does not change the size of the worker pool. " + + "BASIC (deprecated): autoscale the worker pool size up to maxNumWorkers until the job " + + "completes. " + + "THROUGHPUT_BASED: autoscale the workerpool based on throughput (up to maxNumWorkers).") + @Experimental(Experimental.Kind.AUTOSCALING) + AutoscalingAlgorithmType getAutoscalingAlgorithm(); + void setAutoscalingAlgorithm(AutoscalingAlgorithmType value); + + /** + * The maximum number of workers to use for the workerpool. This options limits the size of the + * workerpool for the lifetime of the job, including + * <a href="https://cloud.google.com/dataflow/pipelines/updating-a-pipeline">pipeline updates</a>. + * If left unspecified, the Dataflow service will compute a ceiling. + */ + @Description("The maximum number of workers to use for the workerpool. This options limits the " + + "size of the workerpool for the lifetime of the job, including pipeline updates. " + + "If left unspecified, the Dataflow service will compute a ceiling.") + int getMaxNumWorkers(); + void setMaxNumWorkers(int value); + + /** + * Remote worker disk size, in gigabytes, or 0 to use the default size. + */ + @Description("Remote worker disk size, in gigabytes, or 0 to use the default size.") + int getDiskSizeGb(); + void setDiskSizeGb(int value); + + /** + * Docker container image that executes Dataflow worker harness, residing in Google Container + * Registry. + */ + @Default.InstanceFactory(WorkerHarnessContainerImageFactory.class) + @Description("Docker container image that executes Dataflow worker harness, residing in Google " + + " Container Registry.") + @Hidden + String getWorkerHarnessContainerImage(); + void setWorkerHarnessContainerImage(String value); + + /** + * Returns the default Docker container image that executes Dataflow worker harness, residing in + * Google Container Registry. + */ + public static class WorkerHarnessContainerImageFactory + implements DefaultValueFactory<String> { + @Override + public String create(PipelineOptions options) { + DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); + if (dataflowOptions.isStreaming()) { + return DataflowPipelineRunner.STREAMING_WORKER_HARNESS_CONTAINER_IMAGE; + } else { + return DataflowPipelineRunner.BATCH_WORKER_HARNESS_CONTAINER_IMAGE; + } + } + } + + /** + * GCE <a href="https://cloud.google.com/compute/docs/networking">network</a> for launching + * workers. + * + * <p>Default is up to the Dataflow service. + */ + @Description("GCE network for launching workers. For more information, see the reference " + + "documentation https://cloud.google.com/compute/docs/networking. " + + "Default is up to the Dataflow service.") + String getNetwork(); + void setNetwork(String value); + + /** + * GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching + * workers. + * + * <p>Default is up to the Dataflow service. Expected format is + * regions/REGION/subnetworks/SUBNETWORK. + * + * <p>You may also need to specify network option. + */ + @Description("GCE subnetwork for launching workers. For more information, see the reference " + + "documentation https://cloud.google.com/compute/docs/networking. " + + "Default is up to the Dataflow service.") + String getSubnetwork(); + void setSubnetwork(String value); + + /** + * GCE <a href="https://developers.google.com/compute/docs/zones" + * >availability zone</a> for launching workers. + * + * <p>Default is up to the Dataflow service. + */ + @Description("GCE availability zone for launching workers. See " + + "https://developers.google.com/compute/docs/zones for a list of valid options. " + + "Default is up to the Dataflow service.") + String getZone(); + void setZone(String value); + + /** + * Machine type to create Dataflow worker VMs as. + * + * <p>See <a href="https://cloud.google.com/compute/docs/machine-types">GCE machine types</a> + * for a list of valid options. + * + * <p>If unset, the Dataflow service will choose a reasonable default. + */ + @Description("Machine type to create Dataflow worker VMs as. See " + + "https://cloud.google.com/compute/docs/machine-types for a list of valid options. " + + "If unset, the Dataflow service will choose a reasonable default.") + String getWorkerMachineType(); + void setWorkerMachineType(String value); + + /** + * The policy for tearing down the workers spun up by the service. + */ + public enum TeardownPolicy { + /** + * All VMs created for a Dataflow job are deleted when the job finishes, regardless of whether + * it fails or succeeds. + */ + TEARDOWN_ALWAYS("TEARDOWN_ALWAYS"), + /** + * All VMs created for a Dataflow job are left running when the job finishes, regardless of + * whether it fails or succeeds. + */ + TEARDOWN_NEVER("TEARDOWN_NEVER"), + /** + * All VMs created for a Dataflow job are deleted when the job succeeds, but are left running + * when it fails. (This is typically used for debugging failing jobs by SSHing into the + * workers.) + */ + TEARDOWN_ON_SUCCESS("TEARDOWN_ON_SUCCESS"); + + private final String teardownPolicy; + + private TeardownPolicy(String teardownPolicy) { + this.teardownPolicy = teardownPolicy; + } + + public String getTeardownPolicyName() { + return this.teardownPolicy; + } + } + + /** + * The teardown policy for the VMs. + * + * <p>If unset, the Dataflow service will choose a reasonable default. + */ + @Description("The teardown policy for the VMs. If unset, the Dataflow service will " + + "choose a reasonable default.") + TeardownPolicy getTeardownPolicy(); + void setTeardownPolicy(TeardownPolicy value); + + /** + * List of local files to make available to workers. + * + * <p>Files are placed on the worker's classpath. + * + * <p>The default value is the list of jars from the main program's classpath. + */ + @Description("Files to stage on GCS and make available to workers. " + + "Files are placed on the worker's classpath. " + + "The default value is all files from the classpath.") + @JsonIgnore + List<String> getFilesToStage(); + void setFilesToStage(List<String> value); + + /** + * Specifies what type of persistent disk should be used. The value should be a full or partial + * URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For + * more information, see the + * <a href="https://cloud.google.com/compute/docs/reference/latest/diskTypes">API reference + * documentation for DiskTypes</a>. + */ + @Description("Specifies what type of persistent disk should be used. The value should be a full " + + "or partial URL of a disk type resource, e.g., zones/us-central1-f/disks/pd-standard. For " + + "more information, see the API reference documentation for DiskTypes: " + + "https://cloud.google.com/compute/docs/reference/latest/diskTypes") + String getWorkerDiskType(); + void setWorkerDiskType(String value); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java new file mode 100644 index 0000000..3cd7b03 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowProfilingOptions.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +import org.apache.beam.sdk.annotations.Experimental; + +import java.util.HashMap; + +/** + * Options for controlling profiling of pipeline execution. + */ +@Description("[Experimental] Used to configure profiling of the Dataflow pipeline") +@Experimental +@Hidden +public interface DataflowProfilingOptions { + + @Description("Whether to periodically dump profiling information to local disk.\n" + + "WARNING: Enabling this option may fill local disk with profiling information.") + boolean getEnableProfilingAgent(); + void setEnableProfilingAgent(boolean enabled); + + @Description( + "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.") + @Hidden + DataflowProfilingAgentConfiguration getProfilingAgentConfiguration(); + void setProfilingAgentConfiguration(DataflowProfilingAgentConfiguration configuration); + + /** + * Configuration the for profiling agent. + */ + public static class DataflowProfilingAgentConfiguration extends HashMap<String, Object> { + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java new file mode 100644 index 0000000..7705b66 --- /dev/null +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/options/DataflowWorkerHarnessOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.options; + +/** + * Options that are used exclusively within the Dataflow worker harness. + * These options have no effect at pipeline creation time. + */ +@Description("[Internal] Options that are used exclusively within the Dataflow worker harness. " + + "These options have no effect at pipeline creation time.") +@Hidden +public interface DataflowWorkerHarnessOptions extends DataflowPipelineOptions { + /** + * The identity of the worker running this pipeline. + */ + @Description("The identity of the worker running this pipeline.") + String getWorkerId(); + void setWorkerId(String value); + + /** + * The identity of the Dataflow job. + */ + @Description("The identity of the Dataflow job.") + String getJobId(); + void setJobId(String value); + + /** + * The size of the worker's in-memory cache, in megabytes. + * + * <p>Currently, this cache is used for storing read values of side inputs. + */ + @Description("The size of the worker's in-memory cache, in megabytes.") + @Default.Integer(100) + Integer getWorkerCacheMb(); + void setWorkerCacheMb(Integer value); +}