[jira] [Work logged] (BEAM-2281) call SqlFunctions in operator implementation

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-2281?focusedWorklogId=110453=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110453
 ]

ASF GitHub Bot logged work on BEAM-2281:


Author: ASF GitHub Bot
Created on: 10/Jun/18 03:10
Start Date: 10/Jun/18 03:10
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #5596: [BEAM-2281] 
Improve SQL expression testing so SQL syntax tests suffice; delete extraneous 
unit tests
URL: https://github.com/apache/beam/pull/5596#issuecomment-396016523
 
 
   run go precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110453)
Time Spent: 2h  (was: 1h 50m)

> call SqlFunctions in operator implementation
> 
>
> Key: BEAM-2281
> URL: https://issues.apache.org/jira/browse/BEAM-2281
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Xu Mingmin
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> Calcite has a collections of functions in 
> {{org.apache.calcite.runtime.SqlFunctions}}. It sounds a good source to 
> leverage when adding operators as {{BeamSqlExpression}}. 
> [~xumingming] [~app-tarush], any comments?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4365) SQL operator argument evaluation should have one place where it is managed

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4365?focusedWorklogId=110452=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110452
 ]

ASF GitHub Bot logged work on BEAM-4365:


Author: ASF GitHub Bot
Created on: 10/Jun/18 02:35
Start Date: 10/Jun/18 02:35
Worklog Time Spent: 10m 
  Work Description: kennknowles closed pull request #5433: [BEAM-4365] Make 
BeamSqlExpression for operators, use it for string operators
URL: https://github.com/apache/beam/pull/5433
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 40d23793852..6dbbef52594 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -29,11 +29,13 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlDotExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlOperatorExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
+import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.StringOperators;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlMinusExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlModExpression;
@@ -91,15 +93,6 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.math.BeamSqlTruncateExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.reinterpret.BeamSqlReinterpretExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.row.BeamSqlFieldAccessExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlCharLengthExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlConcatExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlInitCapExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlLowerExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlOverlayExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlPositionExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlSubstringExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlTrimExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.string.BeamSqlUpperExpression;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamFilterRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
@@ -351,32 +344,32 @@ static BeamSqlExpression buildExpression(RexNode rexNode) 
{
 
   // string operators
 case "||":
-  ret = new BeamSqlConcatExpression(subExps);
+  ret = new BeamSqlOperatorExpression(StringOperators.CONCAT, subExps);
   break;
 case "POSITION":
-  ret = new BeamSqlPositionExpression(subExps);
+  ret = new BeamSqlOperatorExpression(StringOperators.POSITION, 
subExps);
   break;
 case "CHAR_LENGTH":
 case "CHARACTER_LENGTH":
-  ret = new BeamSqlCharLengthExpression(subExps);
+  ret = new BeamSqlOperatorExpression(StringOperators.CHAR_LENGTH, 
subExps);
   break;
 case "UPPER":
-  ret = new BeamSqlUpperExpression(subExps);
+  ret = new BeamSqlOperatorExpression(StringOperators.UPPER, subExps);
   break;
   

[beam] branch master updated (c8acf0f -> 33bc9e7)

2018-06-09 Thread kenn
This is an automated email from the ASF dual-hosted git repository.

kenn pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


from c8acf0f  This closes #5582: [BEAM-4519] Add staging_token to 
GetManifestRequest
 add 872d151  Make BeamSqlExpression for operators, use it for string 
operators
 add 33bc9e7  Merge pull request #5433: [BEAM-4365] Make BeamSqlExpression 
for operators, use it for string operators

No new revisions were added by this update.

Summary of changes:
 .../sql/impl/interpreter/BeamSqlFnExecutor.java|  29 +-
 ...eExpression.java => BeamSqlBinaryOperator.java} |  24 +-
 .../{package-info.java => BeamSqlOperator.java}|  15 +-
 ...ression.java => BeamSqlOperatorExpression.java} |  26 +-
 ...ryExpression.java => BeamSqlUnaryOperator.java} |  30 +-
 .../impl/interpreter/operator/StringOperators.java | 242 ++
 .../string/BeamSqlCharLengthExpression.java|  41 --
 .../operator/string/BeamSqlConcatExpression.java   |  65 ---
 .../operator/string/BeamSqlInitCapExpression.java  |  57 ---
 .../operator/string/BeamSqlLowerExpression.java|  41 --
 .../operator/string/BeamSqlOverlayExpression.java  |  76 ---
 .../operator/string/BeamSqlPositionExpression.java |  72 ---
 .../string/BeamSqlSubstringExpression.java |  83 
 .../operator/string/BeamSqlTrimExpression.java | 103 
 .../operator/string/BeamSqlUpperExpression.java|  41 --
 .../interpreter/operator/string/package-info.java  |  20 -
 .../impl/interpreter/BeamSqlFnExecutorTest.java| 125 -
 .../interpreter/operator/StringOperatorsTest.java  | 516 +
 .../string/BeamSqlCharLengthExpressionTest.java|  46 --
 .../string/BeamSqlConcatExpressionTest.java|  67 ---
 .../string/BeamSqlInitCapExpressionTest.java   |  56 ---
 .../string/BeamSqlLowerExpressionTest.java |  44 --
 .../string/BeamSqlOverlayExpressionTest.java   |  91 
 .../string/BeamSqlPositionExpressionTest.java  |  90 
 .../string/BeamSqlStringUnaryExpressionTest.java   |  50 --
 .../string/BeamSqlSubstringExpressionTest.java | 108 -
 .../operator/string/BeamSqlTrimExpressionTest.java | 108 -
 .../string/BeamSqlUpperExpressionTest.java |  44 --
 .../BeamSqlStringFunctionsIntegrationTest.java |   4 +-
 29 files changed, 823 insertions(+), 1491 deletions(-)
 copy 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/{arithmetic/BeamSqlDivideExpression.java
 => BeamSqlBinaryOperator.java} (56%)
 copy 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/{package-info.java
 => BeamSqlOperator.java} (70%)
 copy 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/{array/BeamSqlArrayExpression.java
 => BeamSqlOperatorExpression.java} (65%)
 rename 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/{string/BeamSqlStringUnaryExpression.java
 => BeamSqlUnaryOperator.java} (58%)
 create mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperators.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlCharLengthExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlConcatExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlInitCapExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlLowerExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlOverlayExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlPositionExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlSubstringExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlTrimExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/BeamSqlUpperExpression.java
 delete mode 100644 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/string/package-info.java
 create mode 100644 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/StringOperatorsTest.java
 delete mode 100644 

Jenkins build is back to normal : beam_PerformanceTests_HadoopInputFormat #374

2018-06-09 Thread Apache Jenkins Server
See 




[jira] [Work logged] (BEAM-775) Remove Aggregators from the Java SDK

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-775?focusedWorklogId=110449=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110449
 ]

ASF GitHub Bot logged work on BEAM-775:
---

Author: ASF GitHub Bot
Created on: 09/Jun/18 21:39
Start Date: 09/Jun/18 21:39
Worklog Time Spent: 10m 
  Work Description: pabloem commented on issue #2184: [BEAM-775] Remove 
Aggregators from PipelineResults and Examples in Java SDK
URL: https://github.com/apache/beam/pull/2184#issuecomment-396000560
 
 
   For that you can use Metrics.distribution objects, which will track
   minimum, maximum, count and average.
   
   On Sat, Jun 9, 2018, 12:54 PM Abdul Qadeer  wrote:
   
   > Thanks. What about calculating mean values?
   >
   > On Thu, 7 Jun 2018 at 21:31, Pablo  wrote:
   >
   > > Hey! Yes, the way to do that now is by using `Metrics.counter`. Gald to
   > > help further if you have questions!
   > >
   > > On Thu, Jun 7, 2018, 7:18 PM Abdul Qadeer 
   > > wrote:
   > >
   > > > @pabloem  I was using Aggregator to keep
   > > > counter across each InputT bundles. What should be used now? Does
   > Counter
   > > > work in my case?
   > > >
   > > > —
   > > > You are receiving this because you were mentioned.
   > > >
   > > >
   > > > Reply to this email directly, view it on GitHub
   > > > , or
   > > mute
   > > > the thread
   > > > <
   > >
   > 
https://github.com/notifications/unsubscribe-auth/ABPc7M90BseuIpJleYyt1zKUQa0mJanLks5t6d8KgaJpZM4MWCkp
   > > >
   > > > .
   > > >
   > > --
   > > Got feedback? go/pabloem-feedback
   > 
   > >
   > > —
   > > You are receiving this because you commented.
   > > Reply to this email directly, view it on GitHub
   > > , or
   > mute
   > > the thread
   > > <
   > 
https://github.com/notifications/unsubscribe-auth/AGmMs9Tpe52MuTSNA7dhJUJ_ftfchobmks5t6f4wgaJpZM4MWCkp
   > >
   > > .
   > >
   >
   > —
   > You are receiving this because you were mentioned.
   > Reply to this email directly, view it on GitHub
   > , or mute
   > the thread
   > 

   > .
   >
   -- 
   Got feedback? go/pabloem-feedback
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110449)
Time Spent: 1h  (was: 50m)

> Remove Aggregators from the Java SDK
> 
>
> Key: BEAM-775
> URL: https://issues.apache.org/jira/browse/BEAM-775
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Assignee: Pablo Estrada
>Priority: Major
>  Labels: backward-incompatible
> Fix For: 2.0.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4519) Artifact Retrieval Service Protocol should be able to serve multiple Manifests.

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4519?focusedWorklogId=110447=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110447
 ]

ASF GitHub Bot logged work on BEAM-4519:


Author: ASF GitHub Bot
Created on: 09/Jun/18 21:10
Start Date: 09/Jun/18 21:10
Worklog Time Spent: 10m 
  Work Description: jkff commented on issue #5582: [BEAM-4519] Add 
staging_token to GetManifestRequest
URL: https://github.com/apache/beam/pull/5582#issuecomment-395999184
 
 
   Rebased and merged manually.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110447)
Time Spent: 2h 40m  (was: 2.5h)

> Artifact Retrieval Service Protocol should be able to serve multiple 
> Manifests.
> ---
>
> Key: BEAM-4519
> URL: https://issues.apache.org/jira/browse/BEAM-4519
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Axel Magnuson
>Assignee: Axel Magnuson
>Priority: Minor
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> The artifact staging service currently returns a staging_token that can be 
> used as a key to access a manifest.  However, the current protocol does not 
> have a field that accepts this token.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110442=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110442
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238250
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
+  .setManifest(request.getManifest());
+  for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
+proxyManifestBuilder.addLocation(Location.newBuilder()
+.setName(artifactMetadata.getName())
+.setUri(artifactDirResourceId
+.resolve(encodedFileName(artifactMetadata), 
StandardResolveOptions.RESOLVE_FILE)
+.toString()).build());
+  }
+  try (WritableByteChannel manifestWritableByteChannel = FileSystems
+  .create(manifestResourceId, MimeTypes.TEXT)) {
+

[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110434=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110434
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238156
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
+  .setManifest(request.getManifest());
+  for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
+proxyManifestBuilder.addLocation(Location.newBuilder()
+.setName(artifactMetadata.getName())
+.setUri(artifactDirResourceId
+.resolve(encodedFileName(artifactMetadata), 
StandardResolveOptions.RESOLVE_FILE)
+.toString()).build());
+  }
+  try (WritableByteChannel manifestWritableByteChannel = FileSystems
+  .create(manifestResourceId, MimeTypes.TEXT)) {
+

[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110436=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110436
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238135
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
+  .setManifest(request.getManifest());
+  for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
+proxyManifestBuilder.addLocation(Location.newBuilder()
+.setName(artifactMetadata.getName())
+.setUri(artifactDirResourceId
+.resolve(encodedFileName(artifactMetadata), 
StandardResolveOptions.RESOLVE_FILE)
+.toString()).build());
+  }
+  try (WritableByteChannel manifestWritableByteChannel = FileSystems
+  .create(manifestResourceId, MimeTypes.TEXT)) {
+

[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110437=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110437
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238107
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
 
 Review comment:
   Oh nice, didn't know we already had such a proto.
   
   CC: @axelmagn 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110437)
Time Spent: 11h 40m  (was: 11.5h)

> ArtifactStagingService that stages to a distributed filesystem
> 

[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110440=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110440
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238203
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
 
 Review comment:
   Please document more about how this works - how it stores artifacts, 
manifests etc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110440)

> ArtifactStagingService that stages to a distributed filesystem
> --
>
> Key: BEAM-4290
> URL: https://issues.apache.org/jira/browse/BEAM-4290
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-core
>Reporter: Eugene Kirpichov
>Assignee: Ankur Goenka
>Priority: Major
>  Time Spent: 12h
>  Remaining Estimate: 0h
>
> Using the job's staging directory from PipelineOptions.
> Physical layout on the distributed filesystem is TBD but it should allow for 
> arbitrary filenames and ideally for eventually avoiding uploading artifacts 
> that are already there.
> Handling credentials is TBD.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110439=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110439
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238189
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
+  .setManifest(request.getManifest());
+  for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
+proxyManifestBuilder.addLocation(Location.newBuilder()
+.setName(artifactMetadata.getName())
+.setUri(artifactDirResourceId
+.resolve(encodedFileName(artifactMetadata), 
StandardResolveOptions.RESOLVE_FILE)
+.toString()).build());
+  }
+  try (WritableByteChannel manifestWritableByteChannel = FileSystems
+  .create(manifestResourceId, MimeTypes.TEXT)) {
+

[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110443=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110443
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238229
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
+  .setManifest(request.getManifest());
+  for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
+proxyManifestBuilder.addLocation(Location.newBuilder()
+.setName(artifactMetadata.getName())
+.setUri(artifactDirResourceId
+.resolve(encodedFileName(artifactMetadata), 
StandardResolveOptions.RESOLVE_FILE)
+.toString()).build());
+  }
+  try (WritableByteChannel manifestWritableByteChannel = FileSystems
+  .create(manifestResourceId, MimeTypes.TEXT)) {
+

[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110435=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110435
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238157
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
+  .setManifest(request.getManifest());
+  for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
+proxyManifestBuilder.addLocation(Location.newBuilder()
+.setName(artifactMetadata.getName())
+.setUri(artifactDirResourceId
+.resolve(encodedFileName(artifactMetadata), 
StandardResolveOptions.RESOLVE_FILE)
+.toString()).build());
+  }
+  try (WritableByteChannel manifestWritableByteChannel = FileSystems
+  .create(manifestResourceId, MimeTypes.TEXT)) {
+

[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110438=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110438
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238271
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
+  .setManifest(request.getManifest());
+  for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
+proxyManifestBuilder.addLocation(Location.newBuilder()
+.setName(artifactMetadata.getName())
+.setUri(artifactDirResourceId
+.resolve(encodedFileName(artifactMetadata), 
StandardResolveOptions.RESOLVE_FILE)
+.toString()).build());
+  }
+  try (WritableByteChannel manifestWritableByteChannel = FileSystems
+  .create(manifestResourceId, MimeTypes.TEXT)) {
+

[jira] [Work logged] (BEAM-4290) ArtifactStagingService that stages to a distributed filesystem

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4290?focusedWorklogId=110441=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110441
 ]

ASF GitHub Bot logged work on BEAM-4290:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:32
Start Date: 09/Jun/18 20:32
Worklog Time Spent: 10m 
  Work Description: jkff commented on a change in pull request #5591: 
[BEAM-4290] Beam File System based ArtifactStagingService
URL: https://github.com/apache/beam/pull/5591#discussion_r194238285
 
 

 ##
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactStagingService.java
 ##
 @@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.artifact;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.hash.Hashing;
+import com.google.protobuf.util.JsonFormat;
+import io.grpc.stub.StreamObserver;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
+import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
+import 
org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link ArtifactStagingServiceImplBase} based on beam file system.
+ */
+public class BeamFileSystemArtifactStagingService extends 
ArtifactStagingServiceImplBase implements
+FnService {
+
+  private static final Logger LOG =
+  LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  // Use UTF8 for all text encoding.
+  private static final Charset CHARSET = StandardCharsets.UTF_8;
+  public static final String MANIFEST = "MANIFEST";
+
+  @Override
+  public StreamObserver putArtifact(
+  StreamObserver responseObserver) {
+return new PutArtifactStreamObserver(responseObserver);
+  }
+
+  @Override
+  public void commitManifest(
+  CommitManifestRequest request, StreamObserver 
responseObserver) {
+try {
+  ResourceId manifestResourceId = 
getManifestFileResourceId(request.getStagingSessionToken());
+  ResourceId artifactDirResourceId = 
getArtifactDirResourceId(request.getStagingSessionToken());
+  ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder()
+  .setManifest(request.getManifest());
+  for (ArtifactMetadata artifactMetadata : 
request.getManifest().getArtifactList()) {
+proxyManifestBuilder.addLocation(Location.newBuilder()
+.setName(artifactMetadata.getName())
+.setUri(artifactDirResourceId
+.resolve(encodedFileName(artifactMetadata), 
StandardResolveOptions.RESOLVE_FILE)
+.toString()).build());
+  }
+  try (WritableByteChannel manifestWritableByteChannel = FileSystems
+  .create(manifestResourceId, MimeTypes.TEXT)) {
+

[jira] [Comment Edited] (BEAM-3737) Key-aware batching function

2018-06-09 Thread Debasish Das (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507150#comment-16507150
 ] 

Debasish Das edited comment on BEAM-3737 at 6/9/18 8:21 PM:


I saw this is being mentioned in TFMA 
[https://github.com/tensorflow/model-analysis/blob/master/tensorflow_model_analysis/api/impl/evaluate.py]:_AggregateCombineFn...I
 am not clear why BatchElements() is neededgroupByKey takes combiner which 
should run on both map and reduce side...Am I missing something here ? Is it 
the case that beam Combiner does not run on map side ? [~robertwb] is that why 
you mentioned that we should run the combiner upfront in ParDo and then run 
groupByKey to achieve map and reduce side combine ?


was (Author: debasish83):
I saw this is being mentioned in TFMA...I am also not clear why BatchElements() 
is neededgroupByKey takes combiner which should run on both map and reduce 
side...Am I missing something here ? Is it the case that beam Combiner does not 
run on map side ? [~robertwb] is that why you mentioned that we should run the 
combiner upfront in ParDo and then run groupByKey to achieve map and reduce 
side combine ?

> Key-aware batching function
> ---
>
> Key: BEAM-3737
> URL: https://issues.apache.org/jira/browse/BEAM-3737
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chuan Yu Foo
>Priority: Major
>
> I have a CombineFn for which add_input has very large overhead. I would like 
> to batch the incoming elements into a large batch before each call to 
> add_input to reduce this overhead. In other words, I would like to do 
> something like: 
> {{elements | GroupByKey() | BatchElements() | CombineValues(MyCombineFn())}}
> Unfortunately, BatchElements is not key-aware, and can't be used after a 
> GroupByKey to batch elements per key. I'm working around this by doing the 
> batching within CombineValues, which makes the CombineFn rather messy. It 
> would be nice if there were a key-aware BatchElements transform which could 
> be used in this context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3737) Key-aware batching function

2018-06-09 Thread Debasish Das (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-3737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16507150#comment-16507150
 ] 

Debasish Das commented on BEAM-3737:


I saw this is being mentioned in TFMA...I am also not clear why BatchElements() 
is neededgroupByKey takes combiner which should run on both map and reduce 
side...Am I missing something here ? Is it the case that beam Combiner does not 
run on map side ? [~robertwb] is that why you mentioned that we should run the 
combiner upfront in ParDo and then run groupByKey to achieve map and reduce 
side combine ?

> Key-aware batching function
> ---
>
> Key: BEAM-3737
> URL: https://issues.apache.org/jira/browse/BEAM-3737
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py-core
>Reporter: Chuan Yu Foo
>Priority: Major
>
> I have a CombineFn for which add_input has very large overhead. I would like 
> to batch the incoming elements into a large batch before each call to 
> add_input to reduce this overhead. In other words, I would like to do 
> something like: 
> {{elements | GroupByKey() | BatchElements() | CombineValues(MyCombineFn())}}
> Unfortunately, BatchElements is not key-aware, and can't be used after a 
> GroupByKey to batch elements per key. I'm working around this by doing the 
> batching within CombineValues, which makes the CombineFn rather messy. It 
> would be nice if there were a key-aware BatchElements transform which could 
> be used in this context.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4475) Go precommit should include "go test ./..."

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4475?focusedWorklogId=110432=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110432
 ]

ASF GitHub Bot logged work on BEAM-4475:


Author: ASF GitHub Bot
Created on: 09/Jun/18 20:18
Start Date: 09/Jun/18 20:18
Worklog Time Spent: 10m 
  Work Description: jkff closed pull request #5561: [BEAM-4475] Make Go 
precommit build all Go code
URL: https://github.com/apache/beam/pull/5561
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 905fe551650..2bba72cd233 100644
--- a/build.gradle
+++ b/build.gradle
@@ -152,6 +152,15 @@ task javaPostCommit() {
 task goPreCommit() {
   dependsOn ":rat"
   dependsOn ":beam-sdks-go:test"
+  dependsOn ":beam-sdks-go-examples:build"
+  dependsOn ":beam-sdks-go-test:build"
+
+  // Ensure all container Go boot code builds as well.
+  dependsOn ":beam-sdks-java-container:build"
+  dependsOn ":beam-sdks-python-container:build"
+  dependsOn ":beam-sdks-go-container:build"
+  dependsOn ":beam-runners-gcp-gcemd:build"
+  dependsOn ":beam-runners-gcp-gcsproxy:build"
 }
 
 task goPostCommit() {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110432)
Time Spent: 0.5h  (was: 20m)

> Go precommit should include "go test ./..."
> ---
>
> Key: BEAM-4475
> URL: https://issues.apache.org/jira/browse/BEAM-4475
> Project: Beam
>  Issue Type: Task
>  Components: build-system
>Reporter: Henning Rohde
>Assignee: Henning Rohde
>Priority: Minor
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> It would have prevented a recent break caused by a green PR: 
> https://github.com/apache/beam/pull/5558.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-775) Remove Aggregators from the Java SDK

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-775?focusedWorklogId=110430=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110430
 ]

ASF GitHub Bot logged work on BEAM-775:
---

Author: ASF GitHub Bot
Created on: 09/Jun/18 19:54
Start Date: 09/Jun/18 19:54
Worklog Time Spent: 10m 
  Work Description: zorro786 commented on issue #2184: [BEAM-775] Remove 
Aggregators from PipelineResults and Examples in Java SDK
URL: https://github.com/apache/beam/pull/2184#issuecomment-395994533
 
 
   Thanks. What about calculating mean values?
   
   On Thu, 7 Jun 2018 at 21:31, Pablo  wrote:
   
   > Hey! Yes, the way to do that now is by using `Metrics.counter`. Gald to
   > help further if you have questions!
   >
   > On Thu, Jun 7, 2018, 7:18 PM Abdul Qadeer 
   > wrote:
   >
   > > @pabloem  I was using Aggregator to keep
   > > counter across each InputT bundles. What should be used now? Does Counter
   > > work in my case?
   > >
   > > —
   > > You are receiving this because you were mentioned.
   > >
   > >
   > > Reply to this email directly, view it on GitHub
   > > , or
   > mute
   > > the thread
   > > <
   > 
https://github.com/notifications/unsubscribe-auth/ABPc7M90BseuIpJleYyt1zKUQa0mJanLks5t6d8KgaJpZM4MWCkp
   > >
   > > .
   > >
   > --
   > Got feedback? go/pabloem-feedback
   >
   > —
   > You are receiving this because you commented.
   > Reply to this email directly, view it on GitHub
   > , or mute
   > the thread
   > 

   > .
   >
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110430)
Time Spent: 50m  (was: 40m)

> Remove Aggregators from the Java SDK
> 
>
> Key: BEAM-775
> URL: https://issues.apache.org/jira/browse/BEAM-775
> Project: Beam
>  Issue Type: Sub-task
>  Components: sdk-java-core
>Reporter: Ben Chambers
>Assignee: Pablo Estrada
>Priority: Major
>  Labels: backward-incompatible
> Fix For: 2.0.0
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: beam_PerformanceTests_HadoopInputFormat #373

2018-06-09 Thread Apache Jenkins Server
See 


--
[...truncated 130.05 KB...]
at 
org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at 
org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)

Jun 09, 2018 6:05:28 PM org.postgresql.Driver connect
SEVERE: Connection error: 
org.postgresql.util.PSQLException: The connection attempt failed.
at 
org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:257)
at 
org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.(PgConnection.java:195)
at org.postgresql.Driver.makeConnection(Driver.java:452)
at org.postgresql.Driver.connect(Driver.java:254)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:247)
at 
org.postgresql.ds.common.BaseDataSource.getConnection(BaseDataSource.java:94)
at 
org.postgresql.ds.common.BaseDataSource.getConnection(BaseDataSource.java:79)
at 
org.apache.beam.sdk.io.common.DatabaseTestHelper.deleteTable(DatabaseTestHelper.java:57)
at 
org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIOIT.tearDown(HadoopInputFormatIOIT.java:124)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:33)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
at 
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
at 
org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at 

[jira] [Work logged] (BEAM-4519) Artifact Retrieval Service Protocol should be able to serve multiple Manifests.

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4519?focusedWorklogId=110415=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110415
 ]

ASF GitHub Bot logged work on BEAM-4519:


Author: ASF GitHub Bot
Created on: 09/Jun/18 17:21
Start Date: 09/Jun/18 17:21
Worklog Time Spent: 10m 
  Work Description: angoenka commented on issue #5582: [BEAM-4519] Add 
staging_token to GetManifestRequest
URL: https://github.com/apache/beam/pull/5582#issuecomment-395985341
 
 
   https://github.com/apache/beam/pull/5597 Should have fixed it.
   Please rebase.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110415)
Time Spent: 2h 20m  (was: 2h 10m)

> Artifact Retrieval Service Protocol should be able to serve multiple 
> Manifests.
> ---
>
> Key: BEAM-4519
> URL: https://issues.apache.org/jira/browse/BEAM-4519
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Axel Magnuson
>Assignee: Axel Magnuson
>Priority: Minor
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> The artifact staging service currently returns a staging_token that can be 
> used as a key to access a manifest.  However, the current protocol does not 
> have a field that accepts this token.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4449) Use Calc instead of Project and Filter separately

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4449?focusedWorklogId=110409=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110409
 ]

ASF GitHub Bot logged work on BEAM-4449:


Author: ASF GitHub Bot
Created on: 09/Jun/18 13:53
Start Date: 09/Jun/18 13:53
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #5544: [BEAM-4449] 
Replace project and filter with calc
URL: https://github.com/apache/beam/pull/5544#issuecomment-395971148
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110409)
Time Spent: 5.5h  (was: 5h 20m)

> Use Calc instead of Project and Filter separately
> -
>
> Key: BEAM-4449
> URL: https://issues.apache.org/jira/browse/BEAM-4449
> Project: Beam
>  Issue Type: Improvement
>  Components: dsl-sql
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Calcite has a combined Calc operator that is amenable to more optimization, 
> and also means less code to manage as we adjust how the operators/expressions 
> are implemented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4528) ReferenceRunner pipelineExecution flake

2018-06-09 Thread Kenneth Knowles (JIRA)
Kenneth Knowles created BEAM-4528:
-

 Summary: ReferenceRunner pipelineExecution flake
 Key: BEAM-4528
 URL: https://issues.apache.org/jira/browse/BEAM-4528
 Project: Beam
  Issue Type: Bug
  Components: runner-direct
Reporter: Kenneth Knowles
Assignee: Henning Rohde


https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/6303/testReport/junit/org.apache.beam.runners.direct.portable/ReferenceRunnerTest/pipelineExecution/

{code}
java.lang.IllegalStateException: sendHeaders has already been called
at 
org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
at io.grpc.internal.ServerCallImpl.sendHeaders(ServerCallImpl.java:104)
at 
io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:282)
at 
org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver.close(BeamFnDataBufferingOutboundObserver.java:112)
at 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$ActiveBundle.close(SdkHarnessClient.java:224)
at 
org.apache.beam.runners.direct.portable.RemoteStageEvaluatorFactory$RemoteStageEvaluator.finishBundle(RemoteStageEvaluatorFactory.java:85)
at 
org.apache.beam.runners.direct.portable.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:154)
at 
org.apache.beam.runners.direct.portable.DirectTransformExecutor.run(DirectTransformExecutor.java:103)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.IllegalStateException: Processing bundle failed, 
TODO: [BEAM-3962] abort bundle.
{code}

std err:
{code}
Jun 09, 2018 5:21:31 AM org.apache.beam.sdk.coders.SerializableCoder 
checkEqualsMethodDefined
WARNING: Can't verify serialized elements of type BoundedSource have well 
defined equals method. This may produce incorrect results on some PipelineRunner
Jun 09, 2018 5:21:31 AM org.apache.beam.sdk.coders.SerializableCoder 
checkEqualsMethodDefined
WARNING: Can't verify serialized elements of type BoundedSource have well 
defined equals method. This may produce incorrect results on some PipelineRunner
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (BEAM-4527) JMSIOTest testCheckpointMark flake

2018-06-09 Thread Kenneth Knowles (JIRA)
Kenneth Knowles created BEAM-4527:
-

 Summary: JMSIOTest testCheckpointMark flake
 Key: BEAM-4527
 URL: https://issues.apache.org/jira/browse/BEAM-4527
 Project: Beam
  Issue Type: Bug
  Components: io-java-jms
Reporter: Kenneth Knowles
Assignee: Jean-Baptiste Onofré


https://builds.apache.org/job/beam_PreCommit_Java_GradleBuild/6301/testReport/junit/org.apache.beam.sdk.io.jms/JmsIOTest/testCheckpointMark/

{code}
java.lang.AssertionError: expected:<6> but was:<9>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at 
org.apache.beam.sdk.io.jms.JmsIOTest.testCheckpointMark(JmsIOTest.java:318)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-4365) SQL operator argument evaluation should have one place where it is managed

2018-06-09 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/BEAM-4365?focusedWorklogId=110408=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-110408
 ]

ASF GitHub Bot logged work on BEAM-4365:


Author: ASF GitHub Bot
Created on: 09/Jun/18 13:49
Start Date: 09/Jun/18 13:49
Worklog Time Spent: 10m 
  Work Description: kennknowles commented on issue #5433: [BEAM-4365] Make 
BeamSqlExpression for operators, use it for string operators
URL: https://github.com/apache/beam/pull/5433#issuecomment-395970864
 
 
   run java precommit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 110408)
Time Spent: 3h 10m  (was: 3h)

> SQL operator argument evaluation should have one place where it is managed
> --
>
> Key: BEAM-4365
> URL: https://issues.apache.org/jira/browse/BEAM-4365
> Project: Beam
>  Issue Type: Bug
>  Components: dsl-sql
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> The way Beam SQL is factored, each operator has to explicitly ask its 
> argument to be evaluated. This should be handled generically at a higher 
> level. Since the language is pure and terminating, it is fine for them to 
> vary, but given the simplicity of the expression language it makes sense to 
> use simple call-by-value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : beam_PostCommit_Java_GradleBuild #715

2018-06-09 Thread Apache Jenkins Server
See 




Build failed in Jenkins: beam_PostCommit_Java_GradleBuild #714

2018-06-09 Thread Apache Jenkins Server
See 


--
[...truncated 17.96 MB...]

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testReportFailures 
STANDARD_OUT
Dataflow SDK version: 2.6.0-SNAPSHOT

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testReportFailures 
STANDARD_ERROR
Jun 09, 2018 6:38:18 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To access the Dataflow monitoring console, please navigate to 
https://console.cloud.google.com/dataflow/jobsDetail/locations/us-central1/jobs/2018-06-08_23_38_17-413634903865142875?project=apache-beam-testing

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testReportFailures 
STANDARD_OUT
Submitted job: 2018-06-08_23_38_17-413634903865142875

org.apache.beam.sdk.io.gcp.spanner.SpannerWriteIT > testReportFailures 
STANDARD_ERROR
Jun 09, 2018 6:38:18 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: To cancel the job using the 'gcloud' tool, run:
> gcloud dataflow jobs --project=apache-beam-testing cancel 
--region=us-central1 2018-06-08_23_38_17-413634903865142875
Jun 09, 2018 6:38:18 AM org.apache.beam.runners.dataflow.TestDataflowRunner 
run
INFO: Running Dataflow job 2018-06-08_23_38_17-413634903865142875 with 0 
expected assertions.
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:17.435Z: Autoscaling is enabled for job 
2018-06-08_23_38_17-413634903865142875. The number of workers will be between 1 
and 1000.
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:17.456Z: Autoscaling was automatically enabled for 
job 2018-06-08_23_38_17-413634903865142875.
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:19.747Z: Checking required Cloud APIs are enabled.
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:19.862Z: Checking permissions granted to controller 
Service Account.
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:22.766Z: Worker configuration: n1-standard-1 in 
us-central1-c.
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.102Z: Expanding CoGroupByKey operations into 
optimizable parts.
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.268Z: Expanding GroupByKey operations into 
optimizable parts.
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.292Z: Lifting ValueCombiningMappingFns into 
MergeBucketsMappingFns
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.464Z: Fusing adjacent ParDo, Read, Write, and 
Flatten operations
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.487Z: Elided trivial flatten 
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.517Z: Fusing consumer SpannerIO.Write/Write 
mutations to Cloud Spanner/Wait.OnSignal/Wait/Map into SpannerIO.Write/Write 
mutations to Cloud Spanner/Create seed/Read(CreateSource)
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.545Z: Fusing consumer SpannerIO.Write/Write 
mutations to Cloud Spanner/Read information schema into SpannerIO.Write/Write 
mutations to Cloud Spanner/Wait.OnSignal/Wait/Map
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.574Z: Fusing consumer SpannerIO.Write/Write 
mutations to Cloud Spanner/Schema 
View/Combine.GloballyAsSingletonView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/BatchViewOverrides.GroupByKeyAndSortValuesOnly/Write
 into SpannerIO.Write/Write mutations to Cloud Spanner/Schema 
View/Combine.GloballyAsSingletonView/BatchViewOverrides.GroupByWindowHashAsKeyAndWindowAsSortKey/ParDo(UseWindowHashAsKeyAndWindowAsSortKey)
Jun 09, 2018 6:38:29 AM 
org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2018-06-09T06:38:23.603Z: Fusing consumer SpannerIO.Write/Write 
mutations to Cloud Spanner/Schema 
View/Combine.GloballyAsSingletonView/ParDo(IsmRecordForSingularValuePerWindow) 
into SpannerIO.Write/Write mutations to Cloud Spanner/Schema