[GitHub] beam pull request #3854: Speeds up CompressedSourceTest

2017-09-14 Thread jkff
GitHub user jkff opened a pull request:

https://github.com/apache/beam/pull/3854

Speeds up CompressedSourceTest

This test doesn't need a TestPipeline. This makes the test at least 10x 
faster and less flaky.

R: @lukecwik 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkff/incubator-beam compressed-source

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3854.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3854


commit 952a1c77c6cbec79c2a131812e2a3d144b906d6a
Author: Eugene Kirpichov 
Date:   2017-09-15T00:51:28Z

Speeds up CompressedSourceTest




---


[GitHub] beam pull request #3703: [BEAM-1637] Create Elasticsearch IO compatible with...

2017-09-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3703


---


[jira] [Commented] (BEAM-1637) Create Elasticsearch IO compatible with ES 5.x

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167154#comment-16167154
 ] 

ASF GitHub Bot commented on BEAM-1637:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3703


> Create Elasticsearch IO compatible with ES 5.x
> --
>
> Key: BEAM-1637
> URL: https://issues.apache.org/jira/browse/BEAM-1637
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-extensions
>Reporter: Etienne Chauchot
>Assignee: Etienne Chauchot
>Priority: Minor
>
> The current Elasticsearch IO (see 
> https://issues.apache.org/jira/browse/BEAM-425) is only compatible with 
> Elasticsearch v 2.x. The aim is to have an IO compatible with ES v 5.x. 
> Beyond being able to address v5.x elasticsearch instances, we could also 
> leverage the use of the Elasticsearch pipeline API and also better split the 
> dataset (be as close as possible of desiredBundleSize) thanks to the new ES 
> split API that allows ES shards splitting.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[3/4] beam git commit: Piggyback: deflakes MongoDBGridFSIOTest which failed during merge

2017-09-14 Thread jkff
Piggyback: deflakes MongoDBGridFSIOTest which failed during merge


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c21cab5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c21cab5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c21cab5

Branch: refs/heads/master
Commit: 6c21cab54c22b8ed4d6ccbcd20c27cdb64277b7c
Parents: 0d4fd19
Author: Eugene Kirpichov 
Authored: Thu Sep 14 17:22:33 2017 -0700
Committer: Eugene Kirpichov 
Committed: Thu Sep 14 17:22:33 2017 -0700

--
 .../java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/6c21cab5/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
--
diff --git 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
index 826af1c..19f8d87 100644
--- 
a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
+++ 
b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java
@@ -233,7 +233,7 @@ public class MongoDBGridFSIOTest implements Serializable {
 }
   }
 })
-.withSkew(new Duration(3601000L))
+.withSkew(new Duration(361L))
 .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of(;
 
 PAssert.thatSingleton(output.apply("Count All", Count.>globally()))



[4/4] beam git commit: This closes #3703: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x

2017-09-14 Thread jkff
This closes #3703: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/307b0368
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/307b0368
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/307b0368

Branch: refs/heads/master
Commit: 307b03680ba8b6278d80a07b1ce73421804aa994
Parents: e7601aa 6c21cab
Author: Eugene Kirpichov 
Authored: Thu Sep 14 17:24:49 2017 -0700
Committer: Eugene Kirpichov 
Committed: Thu Sep 14 17:24:49 2017 -0700

--
 pom.xml |  22 ++
 .../elasticsearch-tests-2/pom.xml   |  60 
 .../src/test/contrib/create_elk_container.sh|  24 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 123 +++
 .../io/elasticsearch/ElasticsearchIOTest.java   | 185 ++
 .../elasticsearch-tests-5/pom.xml   | 126 +++
 .../src/test/contrib/create_elk_container.sh|  24 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 122 +++
 .../io/elasticsearch/ElasticsearchIOTest.java   | 184 ++
 .../org/elasticsearch/bootstrap/JarHell.java|  39 ++
 .../elasticsearch-tests-common/pom.xml  |  81 +
 .../elasticsearch/ElasticSearchIOTestUtils.java | 142 
 .../elasticsearch/ElasticsearchIOITCommon.java  |  92 +
 .../ElasticsearchIOTestCommon.java  | 306 
 sdks/java/io/elasticsearch-tests/pom.xml| 144 
 sdks/java/io/elasticsearch/pom.xml  | 234 +---
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 229 +++-
 .../beam/sdk/io/elasticsearch/package-info.java |   1 -
 .../src/test/contrib/create_elk_container.sh|  24 --
 .../elasticsearch/ElasticSearchIOTestUtils.java | 138 ---
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 155 
 .../io/elasticsearch/ElasticsearchIOTest.java   | 355 ---
 .../elasticsearch/ElasticsearchTestDataSet.java |  97 -
 .../sdk/io/mongodb/MongoDBGridFSIOTest.java |   2 +-
 sdks/java/io/pom.xml|   1 +
 sdks/java/javadoc/pom.xml   |  10 +
 26 files changed, 1926 insertions(+), 994 deletions(-)
--




[1/4] beam git commit: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x

2017-09-14 Thread jkff
Repository: beam
Updated Branches:
  refs/heads/master e7601aac3 -> 307b03680


http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
--
diff --git 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 5041eec..5eebe00 100644
--- 
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++ 
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.security.KeyStore;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -63,7 +64,6 @@ import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.entity.ContentType;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.message.BasicHeader;
 import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
 import org.apache.http.nio.entity.NStringEntity;
 import org.apache.http.ssl.SSLContexts;
@@ -73,7 +73,6 @@ import org.elasticsearch.client.RestClientBuilder;
 
 /**
  * Transforms for reading and writing data from/to Elasticsearch.
- * This IO is only compatible with Elasticsearch v2.x
  *
  * Reading from Elasticsearch
  *
@@ -145,6 +144,7 @@ public class ElasticsearchIO {
 
   private static final ObjectMapper mapper = new ObjectMapper();
 
+  @VisibleForTesting
   static JsonNode parseResponse(Response response) throws IOException {
 return mapper.readValue(response.getEntity().getContent(), JsonNode.class);
   }
@@ -153,23 +153,23 @@ public class ElasticsearchIO {
   @AutoValue
   public abstract static class ConnectionConfiguration implements Serializable 
{
 
-abstract List getAddresses();
+public abstract List getAddresses();
 
 @Nullable
-abstract String getUsername();
+public abstract String getUsername();
 
 @Nullable
-abstract String getPassword();
+public abstract String getPassword();
 
 @Nullable
-abstract String getKeystorePath();
+public abstract String getKeystorePath();
 
 @Nullable
-abstract String getKeystorePassword();
+public abstract String getKeystorePassword();
 
-abstract String getIndex();
+public abstract String getIndex();
 
-abstract String getType();
+public abstract String getType();
 
 abstract Builder builder();
 
@@ -267,6 +267,7 @@ public class ElasticsearchIO {
   builder.addIfNotNull(DisplayData.item("keystore.path", 
getKeystorePath()));
 }
 
+@VisibleForTesting
 RestClient createClient() throws IOException {
   HttpHost[] hosts = new HttpHost[getAddresses().size()];
   int i = 0;
@@ -399,9 +400,8 @@ public class ElasticsearchIO {
   checkState(
   connectionConfiguration != null,
   "withConnectionConfiguration() is required");
-  checkVersion(connectionConfiguration);
-  return input.apply(
-  org.apache.beam.sdk.io.Read.from(new 
BoundedElasticsearchSource(this, null)));
+  return input.apply(org.apache.beam.sdk.io.Read
+  .from(new BoundedElasticsearchSource(this, null, null, null)));
 }
 
 @Override
@@ -416,55 +416,94 @@ public class ElasticsearchIO {
 
   /** A {@link BoundedSource} reading from Elasticsearch. */
   @VisibleForTesting
-  static class BoundedElasticsearchSource extends BoundedSource {
+  public static class BoundedElasticsearchSource extends BoundedSource 
{
 
-private final ElasticsearchIO.Read spec;
-// shardPreference is the shard number where the source will read the 
documents
-@Nullable private final String shardPreference;
+private int backendVersion;
 
-BoundedElasticsearchSource(Read spec, @Nullable String shardPreference) {
+private final Read spec;
+// shardPreference is the shard id where the source will read the documents
+@Nullable
+private final String shardPreference;
+@Nullable
+private final Integer numSlices;
+@Nullable
+private final Integer sliceId;
+
+//constructor used in split() when we know the backend version
+private BoundedElasticsearchSource(Read spec, @Nullable String 
shardPreference,
+@Nullable Integer numSlices, @Nullable Integer sliceId, int 
backendVersion) {
+  this.backendVersion = backendVersion;
   this.spec = spec;
   this.shardPreference = shardPreference;
+  this.numSlices = numSlices;
+  this.sliceId = sliceId;
 }
 
+@VisibleForTesting
+BoundedElasticsearchSource(Read spec, @Nullable String 

[2/4] beam git commit: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x

2017-09-14 Thread jkff
[BEAM-1637] Create Elasticsearch IO compatible with ES 5.x


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0d4fd190
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0d4fd190
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0d4fd190

Branch: refs/heads/master
Commit: 0d4fd19076722515f29c34144cc93aab3795801f
Parents: e7601aa
Author: Etienne Chauchot 
Authored: Mon Jun 26 10:58:21 2017 +0200
Committer: Eugene Kirpichov 
Committed: Thu Sep 14 17:01:27 2017 -0700

--
 pom.xml |  22 ++
 .../elasticsearch-tests-2/pom.xml   |  60 
 .../src/test/contrib/create_elk_container.sh|  24 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 123 +++
 .../io/elasticsearch/ElasticsearchIOTest.java   | 185 ++
 .../elasticsearch-tests-5/pom.xml   | 126 +++
 .../src/test/contrib/create_elk_container.sh|  24 ++
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 122 +++
 .../io/elasticsearch/ElasticsearchIOTest.java   | 184 ++
 .../org/elasticsearch/bootstrap/JarHell.java|  39 ++
 .../elasticsearch-tests-common/pom.xml  |  81 +
 .../elasticsearch/ElasticSearchIOTestUtils.java | 142 
 .../elasticsearch/ElasticsearchIOITCommon.java  |  92 +
 .../ElasticsearchIOTestCommon.java  | 306 
 sdks/java/io/elasticsearch-tests/pom.xml| 144 
 sdks/java/io/elasticsearch/pom.xml  | 234 +---
 .../sdk/io/elasticsearch/ElasticsearchIO.java   | 229 +++-
 .../beam/sdk/io/elasticsearch/package-info.java |   1 -
 .../src/test/contrib/create_elk_container.sh|  24 --
 .../elasticsearch/ElasticSearchIOTestUtils.java | 138 ---
 .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 155 
 .../io/elasticsearch/ElasticsearchIOTest.java   | 355 ---
 .../elasticsearch/ElasticsearchTestDataSet.java |  97 -
 sdks/java/io/pom.xml|   1 +
 sdks/java/javadoc/pom.xml   |  10 +
 25 files changed, 1925 insertions(+), 993 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/pom.xml
--
diff --git a/pom.xml b/pom.xml
index f9644dd..a2d6aae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -462,6 +462,28 @@
 
   
 org.apache.beam
+beam-sdks-java-io-elasticsearch-tests-common
+${project.version}
+test
+tests
+  
+
+  
+org.apache.beam
+beam-sdks-java-io-elasticsearch-tests-2
+${project.version}
+test
+  
+
+  
+org.apache.beam
+beam-sdks-java-io-elasticsearch-tests-5
+${project.version}
+test
+  
+
+  
+org.apache.beam
 beam-sdks-java-io-google-cloud-platform
 ${project.version}
   

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml
--
diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml
new file mode 100644
index 000..a56ffa4
--- /dev/null
+++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml
@@ -0,0 +1,60 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+  4.0.0
+
+  
+org.apache.beam
+beam-sdks-java-io-elasticsearch-tests-parent
+2.2.0-SNAPSHOT
+../pom.xml
+  
+
+  beam-sdks-java-io-elasticsearch-tests-2
+  Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 2.x
+  Tests of ElasticsearchIO on Elasticsearch 2.x
+
+  
+2.4.1
+  
+
+  
+
+
+  com.google.guava
+  guava
+  test
+
+
+
+  org.apache.beam
+  beam-sdks-java-io-elasticsearch-tests-common
+  test
+  tests
+
+
+
+  org.elasticsearch
+  elasticsearch
+  ${elasticsearch.version}
+  test
+
+  
+
+

http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh
--
diff --git 
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh
 
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh
new file mode 100755
index 000..48f6064
--- /dev/null
+++ 

[jira] [Assigned] (BEAM-2834) NullPointerException @ BigQueryServicesImpl.java:759

2017-09-14 Thread Eugene Kirpichov (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eugene Kirpichov reassigned BEAM-2834:
--

Assignee: Reuven Lax  (was: Thomas Groh)

> NullPointerException @ BigQueryServicesImpl.java:759
> 
>
> Key: BEAM-2834
> URL: https://issues.apache.org/jira/browse/BEAM-2834
> Project: Beam
>  Issue Type: Bug
>  Components: runner-dataflow
>Affects Versions: 2.1.0
>Reporter: Andy Barron
>Assignee: Reuven Lax
> Fix For: 2.2.0
>
>
> {code}
> java.lang.NullPointerException
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:759)
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809)
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126)
>   at 
> org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96)
> {code}
> Going through the stack trace, the likely culprit is a null {{retryPolicy}} 
> in {{StreamingWriteFn}}.
> For context, this error showed up about 70 times between 1 am and 1 pm 
> Pacific time (2017-08-31) on a Dataflow streaming job.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2017-09-14 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath updated BEAM-2879:
-
Affects Version/s: Not applicable

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Reporter: Black Phoenix
>Priority: Minor
>  Labels: starter
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2017-09-14 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath updated BEAM-2879:
-
Affects Version/s: (was: 2.1.0)

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Reporter: Black Phoenix
>Priority: Minor
>  Labels: starter
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2017-09-14 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath updated BEAM-2879:
-
Affects Version/s: (was: Not applicable)

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Reporter: Black Phoenix
>Priority: Minor
>  Labels: starter
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2857) Create FileIO in Python

2017-09-14 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath updated BEAM-2857:
-
Labels: starter  (was: )

> Create FileIO in Python
> ---
>
> Key: BEAM-2857
> URL: https://issues.apache.org/jira/browse/BEAM-2857
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py
>Reporter: Eugene Kirpichov
>  Labels: starter
>
> Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), 
> which together cover the majority of needs for general-purpose file 
> ingestion. Beam Python should have something similar.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2857) Create FileIO in Python

2017-09-14 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-2857:


Assignee: (was: Ahmet Altay)

> Create FileIO in Python
> ---
>
> Key: BEAM-2857
> URL: https://issues.apache.org/jira/browse/BEAM-2857
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py
>Reporter: Eugene Kirpichov
>  Labels: starter
>
> Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), 
> which together cover the majority of needs for general-purpose file 
> ingestion. Beam Python should have something similar.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2857) Create FileIO in Python

2017-09-14 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-2857:


Assignee: Ahmet Altay  (was: Chamikara Jayalath)

> Create FileIO in Python
> ---
>
> Key: BEAM-2857
> URL: https://issues.apache.org/jira/browse/BEAM-2857
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-py
>Reporter: Eugene Kirpichov
>Assignee: Ahmet Altay
>  Labels: starter
>
> Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), 
> which together cover the majority of needs for general-purpose file 
> ingestion. Beam Python should have something similar.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2017-09-14 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath updated BEAM-2879:
-
Labels: starter  (was: )

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Black Phoenix
>Priority: Minor
>  Labels: starter
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2017-09-14 Thread Chamikara Jayalath (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chamikara Jayalath reassigned BEAM-2879:


Assignee: (was: Chamikara Jayalath)

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Black Phoenix
>Priority: Minor
>  Labels: starter
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery

2017-09-14 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166960#comment-16166960
 ] 

Chamikara Jayalath commented on BEAM-2879:
--

Thanks for creating this. 

Looks like this is a low hanging fruit to improve performance.

[~kirpichov] [~reuvenlax] is there any reason why we didn't do this so far ?

> Implement and use an Avro coder rather than the JSON one for intermediary 
> files to be loaded in BigQuery
> 
>
> Key: BEAM-2879
> URL: https://issues.apache.org/jira/browse/BEAM-2879
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-gcp
>Affects Versions: 2.1.0
>Reporter: Black Phoenix
>Assignee: Chamikara Jayalath
>Priority: Minor
>  Labels: starter
>
> Before being loaded in BigQuery, temporary files are created and encoded in 
> JSON. Which is a costly solution compared to an Avro alternative 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2808) Improve error message when DoFn @ProcessElement has the wrong window type

2017-09-14 Thread Luke Cwik (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik reassigned BEAM-2808:
---

Assignee: Daniel Oliveira  (was: Luke Cwik)

> Improve error message when DoFn @ProcessElement has the wrong window type
> -
>
> Key: BEAM-2808
> URL: https://issues.apache.org/jira/browse/BEAM-2808
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Daniel Oliveira
>Priority: Minor
>  Labels: starter
>
> The message today is something like this:
> {code}
> processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow)
>  expects window type org.apache.beam.sdk.transforms.windowing.IntervalWindow, 
> which is not a supertype of actual window type 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow
> {code}
> Someone suggested this:
> {code}
> processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow)
>  unable to provide window --  expects window type 
> org.apache.beam.sdk.transforms.windowing.IntervalWindow (from parameter), 
> which is not a supertype of actual window type 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow (assigned by windowing)
> {code}
> If anyone wants to pick this up and wordsmith it more, strip some of the 
> namespaces (where reasonable), etc, that would be great.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (BEAM-2808) Improve error message when DoFn @ProcessElement has the wrong window type

2017-09-14 Thread Luke Cwik (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik reassigned BEAM-2808:
---

Assignee: Luke Cwik

> Improve error message when DoFn @ProcessElement has the wrong window type
> -
>
> Key: BEAM-2808
> URL: https://issues.apache.org/jira/browse/BEAM-2808
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Kenneth Knowles
>Assignee: Luke Cwik
>Priority: Minor
>  Labels: starter
>
> The message today is something like this:
> {code}
> processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow)
>  expects window type org.apache.beam.sdk.transforms.windowing.IntervalWindow, 
> which is not a supertype of actual window type 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow
> {code}
> Someone suggested this:
> {code}
> processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow)
>  unable to provide window --  expects window type 
> org.apache.beam.sdk.transforms.windowing.IntervalWindow (from parameter), 
> which is not a supertype of actual window type 
> org.apache.beam.sdk.transforms.windowing.GlobalWindow (assigned by windowing)
> {code}
> If anyone wants to pick this up and wordsmith it more, strip some of the 
> namespaces (where reasonable), etc, that would be great.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-09-14 Thread Jacob Marble (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166841#comment-16166841
 ] 

Jacob Marble commented on BEAM-2500:


Luke, thanks! I may have found what you're referencing in 
AbstractGoogleAsyncWriteChannel.uploadBufferSize. Default is 64MB, or 8MB:

GCS_UPLOAD_GRANULARITY = 8 * 1024 * 1024;
UPLOAD_CHUNK_SIZE_DEFAULT =
Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024
? GCS_UPLOAD_GRANULARITY : 8 * GCS_UPLOAD_GRANULARITY;

> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-09-14 Thread Luke Cwik (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166823#comment-16166823
 ] 

Luke Cwik commented on BEAM-2500:
-

The GCS implementation uses a fixed size buffer of 32 or 64mbs (don't remember 
which) so buffering in memory is a common practice. 500 mbs does seem like a 
lot but even if you went with 32 mbs, you could support files up to ~320 gb.

> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-09-14 Thread Jacob Marble (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166803#comment-16166803
 ] 

Jacob Marble commented on BEAM-2500:


Multipart upload could get us around the content length requirement, but it's 
awkward. An object can be 5TB, and a multipart upload can have 10,000 parts, so 
I could read 500MB at a time into memory, ship those chunks. Bad idea.

Still can't see how Beam can indicate content length to a FileSystem sink. I'll 
move on to source stuff for a while.

> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2795) FlinkRunner: translate using SDK-agnostic means

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166792#comment-16166792
 ] 

ASF GitHub Bot commented on BEAM-2795:
--

Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3813


> FlinkRunner: translate using SDK-agnostic means
> ---
>
> Key: BEAM-2795
> URL: https://issues.apache.org/jira/browse/BEAM-2795
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-flink
>Reporter: Kenneth Knowles
>Assignee: Kenneth Knowles
>  Labels: portability
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3813: [BEAM-2795] Key FlinkRunner streaming translation o...

2017-09-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3813


---


[2/2] beam git commit: This closes #3813: [BEAM-2795] Key FlinkRunner streaming translation off URN

2017-09-14 Thread kenn
This closes #3813: [BEAM-2795] Key FlinkRunner streaming translation off URN


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e7601aac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e7601aac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e7601aac

Branch: refs/heads/master
Commit: e7601aac3a1c9863ea67f305dc9e1b68041d9d8c
Parents: fa4ecea be9fb29
Author: Kenneth Knowles 
Authored: Thu Sep 14 11:23:37 2017 -0700
Committer: Kenneth Knowles 
Committed: Thu Sep 14 11:23:37 2017 -0700

--
 .../construction/PTransformTranslation.java |  27 ++-
 runners/flink/pom.xml   |   5 +
 .../runners/flink/CreateStreamingFlinkView.java |   3 +
 .../FlinkStreamingTransformTranslators.java | 205 +--
 4 files changed, 215 insertions(+), 25 deletions(-)
--




[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-09-14 Thread Jacob Marble (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166741#comment-16166741
 ] 

Jacob Marble commented on BEAM-2500:


Chamikara, thanks for your comment. I'll switch my implementation to multipart 
after I have something working, just got the simple 5GB version written. I'll 
also give closer consideration to the credentials question after I have the 
harder parts complete. For now, just using flags via PipelineOptions.

So I have completed enough of this to test it out, except one problem. S3 
requires the content length before writing any data, or else the client buffers 
the entire content in memory before writing. I have added contentLength to my 
S3CreateOptions, but how to set that value before S3FileSystem.create() is 
called?

> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-09-14 Thread Luke Cwik (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166735#comment-16166735
 ] 

Luke Cwik commented on BEAM-2500:
-

Performing the multipart download/upload will become important as 5GiBs has 
limited use but start off implementing the simpler thing as multipart 
upload/download can come later.

http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html
Amazon supports an efficient copy operation if you specify "x-amz-copy-source" 
as a header where you don't need to upload the bytes and it just adds some 
metadata that points to the same set of bytes. Depending on which Amazon S3 
Java library you use, they may or may not expose this flexibility.

> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2699) AppliedPTransform is used as a key in hashmaps but PTransform is not hashable/equality-comparable

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166701#comment-16166701
 ] 

ASF GitHub Bot commented on BEAM-2699:
--

GitHub user tgroh opened a pull request:

https://github.com/apache/beam/pull/3853

[BEAM-2699] Update AppliedPTransform Equality

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [ ] Each commit in the pull request should have a meaningful subject 
line and body.
 - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [ ] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---
Use only node name and the pipeline the two transforms are in. Do not
consider the reported equality of inputs, outputs, or transforms.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/beam update_application_equality

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3853


commit 6615badce6f3dd2835ac8eac94d792595e86a576
Author: Thomas Groh 
Date:   2017-09-14T17:50:09Z

Update AppliedPTransform Equality

Use only node name and the pipeline the two transforms are in. Do not
consider the reported equality of inputs, outputs, or transforms.




> AppliedPTransform is used as a key in hashmaps but PTransform is not 
> hashable/equality-comparable
> -
>
> Key: BEAM-2699
> URL: https://issues.apache.org/jira/browse/BEAM-2699
> Project: Beam
>  Issue Type: Bug
>  Components: runner-core
>Reporter: Eugene Kirpichov
>Assignee: Thomas Groh
>
> There's plenty of occurrences in runners-core of Map or BiMap where the key 
> is an AppliedPTransform.
> However, PTransform does not advertise that it is required to implement 
> equals/hashCode, and some transforms can't do it properly anyway - for 
> example, transforms that capture a ValueProvider which is also not 
> hashable/eq-comparable. I'm surprised that things aren't already very broken 
> because of this.
> Fundamentally, I don't see why we should ever compare two PTransform's for 
> equality.
> I looked at the code and wondered "can AppliedPTransform simply be 
> identity-hashable", but right now the answer is no because we can create an 
> AppliedPTransform for the same transform applied to the same thing multiple 
> times.
> Fixing that appears to be not very easy, but definitely possible. Ideally 
> TransformHierarchy.Node would just know its AppliedPTransform, however a Node 
> can be constructed when there's yet no Pipeline. Suppose there's gotta be 
> some way to propagate a Pipeline into Node.finishSpecifying() (which should 
> be called exactly once on the Node, and this should be enforced), and have 
> finishSpecifying() return the AppliedPTransform, and have the caller use that 
> instead of potentially repeatedly calling .toAppliedPTransform() on the same 
> Node.
> [~kenn] is on vacation but perhaps [~tgroh] can help with this meanwhile?
> CC: [~reuvenlax]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3853: [BEAM-2699] Update AppliedPTransform Equality

2017-09-14 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/beam/pull/3853

[BEAM-2699] Update AppliedPTransform Equality

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [ ] Each commit in the pull request should have a meaningful subject 
line and body.
 - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [ ] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---
Use only node name and the pipeline the two transforms are in. Do not
consider the reported equality of inputs, outputs, or transforms.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/beam update_application_equality

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3853.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3853


commit 6615badce6f3dd2835ac8eac94d792595e86a576
Author: Thomas Groh 
Date:   2017-09-14T17:50:09Z

Update AppliedPTransform Equality

Use only node name and the pipeline the two transforms are in. Do not
consider the reported equality of inputs, outputs, or transforms.




---


[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem

2017-09-14 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166678#comment-16166678
 ] 

Chamikara Jayalath commented on BEAM-2500:
--

Thanks for looking into this Jacob. I'll try to answer some of your questions.

I agree that GCS is a good template. It's pretty well battle tested and used by 
all Dataflow pipelines.

5GB limit for FileSystem.copy(), though might be a good start, might not be 
enough for production use. Beam can parallelize reading of a large files. So 
there might be many users who are used to reading a small number of large files 
from their pipelines. Note that we use copy() operation to finalize files 
written using FileBasedSink implementations. So we might need to copy large 
files there as well. It'll be good if copy() can be implemented using multipart 
upload as you mentioned.

FileSystem.create() is to create an empty WrittableByteChannel that will be 
written to later. So we'll have to have a way to stream bytes into S3 (some 
implementation of WrittableByteChannel). I'm not sure if S3 client library 
already supports this.

's3' for schema sounds good to me. 

For efficient parallelized reading FileSystem.open() should return an 
efficiently seekable SeekableByteChannel.

Using copy() + delete() combination for rename() is fine.

We might have to address issues related to providing credentials for accessing 
S3. See following JIRA where some details related to this were discussed when 
[~demeshchuk] looked into adding a S3 file system for Python SDK.
https://issues.apache.org/jira/browse/BEAM-2572

> Add support for S3 as a Apache Beam FileSystem
> --
>
> Key: BEAM-2500
> URL: https://issues.apache.org/jira/browse/BEAM-2500
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Luke Cwik
>Priority: Minor
> Attachments: hadoop_fs_patch.patch
>
>
> Note that this is for providing direct integration with S3 as an Apache Beam 
> FileSystem.
> There is already support for using the Hadoop S3 connector by depending on 
> the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with 
> a S3 configuration[3].
> 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system
> 2: 
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53
> 3: https://wiki.apache.org/hadoop/AmazonS3



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2948) Unable to find registrar when restoring flink job from savepoint

2017-09-14 Thread Luke Cwik (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1616#comment-1616
 ] 

Luke Cwik commented on BEAM-2948:
-

FileSystems assumes that it has been initialized early on within each "worker". 
If this isn't honored then we may find other areas where this is also an issue.

Also, once this is executing over the portability framework, this will be a 
non-issue since it will all happen in the SDK harness.

> Unable to find registrar when restoring flink job from savepoint
> 
>
> Key: BEAM-2948
> URL: https://issues.apache.org/jira/browse/BEAM-2948
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
> Environment: Flink v1.2.1 job on a EMR cluster using Beam v2.0.0
>Reporter: Luke Cwik
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> Reported: 
> https://lists.apache.org/thread.html/d113707e84d7562e6d0d891830a8d85d76a497435105fe3ed1e06e13@%3Cdev.beam.apache.org%3E
> When I try to restore job from savepoint on one task manager I get the
> exception *Unable to find registrar for s3n*.
> The job can write files to s3 acting as a sink. So S3 access works except
> when restoring from savepoint.
> I am passing the following configuration as the  pipeline
> HadoopFileSystemOptions
> options:
> Configuration configuration = new Configuration();
> configuration.set("fs.defaultFS", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.default.name", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.s3.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
> configuration.set("fs.s3n.awsAccessKeyId",
> jobOptions.getAwsAccessKey());
> configuration.set("fs.s3n.awsSecretAccessKey",
> jobOptions.getAwsSecretKey());
> From my investigation it looks like Beam FileSystems.
> setDefaultPipelineOptions method is not called before
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder relay on initialised
> FileSystems.SCHEME_TO_FILESYSTEM map
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Unable to find registrar for s3n
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1059)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1020)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
> ... 6 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2885) Support job+artifact APIs locally

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166645#comment-16166645
 ] 

ASF GitHub Bot commented on BEAM-2885:
--

GitHub user tgroh opened a pull request:

https://github.com/apache/beam/pull/3852

[BEAM-2885] Add a Local FS implementation of the Artifact Staging API

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [ ] Each commit in the pull request should have a meaningful subject 
line and body.
 - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [ ] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/beam local_fs_artifact_staging_service

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3852.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3852


commit afa1a5adda499fb65176e5a5d00cd9cda89c49b6
Author: Thomas Groh 
Date:   2017-09-13T20:32:20Z

Add a Local FS implementation of the Artifact Staging API




> Support job+artifact APIs locally
> -
>
> Key: BEAM-2885
> URL: https://issues.apache.org/jira/browse/BEAM-2885
> Project: Beam
>  Issue Type: Sub-task
>  Components: runner-dataflow
>Reporter: Henning Rohde
>Assignee: Thomas Groh
>  Labels: portability
>
> As per https://s.apache.org/beam-job-api, use local support for 
> submission-side. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3852: [BEAM-2885] Add a Local FS implementation of the Ar...

2017-09-14 Thread tgroh
GitHub user tgroh opened a pull request:

https://github.com/apache/beam/pull/3852

[BEAM-2885] Add a Local FS implementation of the Artifact Staging API

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [ ] Each commit in the pull request should have a meaningful subject 
line and body.
 - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [ ] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tgroh/beam local_fs_artifact_staging_service

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3852.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3852


commit afa1a5adda499fb65176e5a5d00cd9cda89c49b6
Author: Thomas Groh 
Date:   2017-09-13T20:32:20Z

Add a Local FS implementation of the Artifact Staging API




---


[jira] [Commented] (BEAM-2955) Create a Cloud Bigtable HBase connector

2017-09-14 Thread Chamikara Jayalath (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166508#comment-16166508
 ] 

Chamikara Jayalath commented on BEAM-2955:
--

I'm fine with option (2) as well. Extra ~300 lines of code shouldn't be much of 
an issue. I think we should take the option that makes more sense performance 
and usability wise.


> Create a Cloud Bigtable HBase connector
> ---
>
> Key: BEAM-2955
> URL: https://issues.apache.org/jira/browse/BEAM-2955
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-gcp
>Reporter: Solomon Duskis
>Assignee: Solomon Duskis
>
> The Cloud Bigtable (CBT) team has had a Dataflow connector maintained in a 
> different repo for awhile. Recently, we did some reworking of the Cloud 
> Bigtable client that would allow it to better coexist in the Beam ecosystem, 
> and we also released a Beam connector in our repository that exposes HBase 
> idioms rather than the Protobuf idioms of BigtableIO.  More information about 
> the customer experience of the HBase connector can be found here: 
> [https://cloud.google.com/bigtable/docs/dataflow-hbase].
> The Beam repo is a much better place to house a Cloud Bigtable HBase 
> connector.  There are a couple of ways we can implement this new connector:
> # The CBT connector depends on artifacts in the io/hbase maven project.  We 
> can create a new extend HBaseIO for the purposes of CBT.  We would have to 
> add some features to HBaseIO to make that work (dynamic rebalancing, and a 
> way for HBase and CBT's size estimation models to coexist)
> # The BigtableIO connector works well, and we can add an adapter layer on top 
> of it.  I have a proof of concept of it here: 
> [https://github.com/sduskis/cloud-bigtable-client/tree/add_beam/bigtable-dataflow-parent/bigtable-hbase-beam].
> # We can build a separate CBT HBase connector.
> I'm happy to do the work.  I would appreciate some guidance and discussion 
> about the right approach.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException

2017-09-14 Thread Guenther Grill (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166404#comment-16166404
 ] 

Guenther Grill commented on BEAM-2943:
--

I double checked it, by looking into the 
`target/word-count-beam-bundled-0.1.jar` file. The class is there.

> Beam Flink deployment results in ClassNotFoundException
> ---
>
> Key: BEAM-2943
> URL: https://issues.apache.org/jira/browse/BEAM-2943
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
> Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 
> (2017-08-06) x86_64 GNU/Linux
>Reporter: Guenther Grill
>Assignee: Aljoscha Krettek
>  Labels: flink
>
> Hi,
> I followed the guide https://beam.apache.org/get-started/quickstart-java/ to 
> run beam program within a flink cluster. 
> The output of the dependency-command is:
> {code}
> mvn dependency:tree -Pflink-runner |grep flink
>   
> [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
> [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
> [INFO]|  \- org.apache.flink:force-shading:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
> {code}
> Then I started the flink cluster with the correct version with docker-compose
> {code}
> export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
> docker-compose up -d
> {code}
> The compose file looks like this:
> {code}
> version: '3.3'
> services:
>   jobmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6123"
> ports:
>   - "6123:6123"
>   - "8081:8081"
> volumes:
>   - /tmp:/tmp
> command: jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6121"
>   - "6122"
> depends_on:
>   - jobmanager
> command: taskmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> {code}
> The flink cluster works, but when I execute 
> {code}
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
> -Pflink-runner \
> -Dexec.args="--runner=FlinkRunner \
>   --inputFile=pom.xml \
>   --output=/path/to/counts \
>   --flinkMaster=[HOST_IP]:6123 \
>   --filesToStage=target/word-count-beam-bundled-0.1.jar"
> {code}
> I get:
> {code}
> 2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Submitting job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199).
> 2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Using restart strategy NoRestartStrategy for 
> a913f922506053e65e732eeb8336b3bd.
> 2017-09-12 06:39:57,227 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
> via failover strategy: full graph restart
> 2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Running initialization on master for job 
> wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
> 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager  
>   - Failed to submit job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199)
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at Read(CompressedSource) 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': 
> Deserializing the InputFormat 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
>  failed: Could not read the user code wrapper: 
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
>   at 
> 

[jira] [Comment Edited] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException

2017-09-14 Thread Guenther Grill (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166404#comment-16166404
 ] 

Guenther Grill edited comment on BEAM-2943 at 9/14/17 2:57 PM:
---

I double checked it, by looking into the 
{code}target/word-count-beam-bundled-0.1.jar{code} file. The class is there.


was (Author: guenhter):
I double checked it, by looking into the 
`target/word-count-beam-bundled-0.1.jar` file. The class is there.

> Beam Flink deployment results in ClassNotFoundException
> ---
>
> Key: BEAM-2943
> URL: https://issues.apache.org/jira/browse/BEAM-2943
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
> Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 
> (2017-08-06) x86_64 GNU/Linux
>Reporter: Guenther Grill
>Assignee: Aljoscha Krettek
>  Labels: flink
>
> Hi,
> I followed the guide https://beam.apache.org/get-started/quickstart-java/ to 
> run beam program within a flink cluster. 
> The output of the dependency-command is:
> {code}
> mvn dependency:tree -Pflink-runner |grep flink
>   
> [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
> [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
> [INFO]|  \- org.apache.flink:force-shading:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
> {code}
> Then I started the flink cluster with the correct version with docker-compose
> {code}
> export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
> docker-compose up -d
> {code}
> The compose file looks like this:
> {code}
> version: '3.3'
> services:
>   jobmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6123"
> ports:
>   - "6123:6123"
>   - "8081:8081"
> volumes:
>   - /tmp:/tmp
> command: jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6121"
>   - "6122"
> depends_on:
>   - jobmanager
> command: taskmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> {code}
> The flink cluster works, but when I execute 
> {code}
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
> -Pflink-runner \
> -Dexec.args="--runner=FlinkRunner \
>   --inputFile=pom.xml \
>   --output=/path/to/counts \
>   --flinkMaster=[HOST_IP]:6123 \
>   --filesToStage=target/word-count-beam-bundled-0.1.jar"
> {code}
> I get:
> {code}
> 2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Submitting job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199).
> 2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Using restart strategy NoRestartStrategy for 
> a913f922506053e65e732eeb8336b3bd.
> 2017-09-12 06:39:57,227 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
> via failover strategy: full graph restart
> 2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Running initialization on master for job 
> wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
> 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager  
>   - Failed to submit job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199)
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at Read(CompressedSource) 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': 
> Deserializing the InputFormat 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
>  failed: Could not read the user code wrapper: 
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
>   at 
> 

[jira] [Updated] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException

2017-09-14 Thread Guenther Grill (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guenther Grill updated BEAM-2943:
-
Description: 
Hi,

I followed the guide https://beam.apache.org/get-started/quickstart-java/ to 
run beam program within a flink cluster. 

The output of the dependency-command is:

{code}
mvn dependency:tree -Pflink-runner |grep flink  

[INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
[INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
[INFO]|  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
[INFO]|  \- org.apache.flink:force-shading:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime
[INFO]|  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime
[INFO]|  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
[INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
{code}

Then I started the flink cluster with the correct version with docker-compose

{code}
export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10

docker-compose up -d
{code}

The compose file looks like this:

{code}
version: '3.3'
services:
  jobmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
  - "6123"
ports:
  - "6123:6123"
  - "8081:8081"
volumes:
  - /tmp:/tmp
command: jobmanager
environment:
  - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]

  taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
  - "6121"
  - "6122"
depends_on:
  - jobmanager
command: taskmanager
environment:
  - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
{code}

The flink cluster works, but when I execute 

{code}
mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-Pflink-runner \
-Dexec.args="--runner=FlinkRunner \
  --inputFile=pom.xml \
  --output=/path/to/counts \
  --flinkMaster=[HOST_IP]:6123 \
  --filesToStage=target/word-count-beam-bundled-0.1.jar"
{code}

I get:

{code}
2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Submitting job a913f922506053e65e732eeb8336b3bd 
(wordcount-grg-0912063956-c7ea6199).
2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Using restart strategy NoRestartStrategy for 
a913f922506053e65e732eeb8336b3bd.
2017-09-12 06:39:57,227 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
via failover strategy: full graph restart
2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Running initialization on master for job 
wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager
- Failed to submit job a913f922506053e65e732eeb8336b3bd 
(wordcount-grg-0912063956-c7ea6199)
org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
'DataSource (at Read(CompressedSource) 
(org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': 
Deserializing the InputFormat 
(org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) 
failed: Could not read the user code wrapper: 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
   

[jira] [Commented] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166276#comment-16166276
 ] 

Aljoscha Krettek commented on BEAM-2943:


Could you verify that 
{{org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}} is in 
the jar file? I just want to rule out the easy cases but it could still be a 
more involved problem.

> Beam Flink deployment results in ClassNotFoundException
> ---
>
> Key: BEAM-2943
> URL: https://issues.apache.org/jira/browse/BEAM-2943
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Affects Versions: 2.1.0
> Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 
> (2017-08-06) x86_64 GNU/Linux
>Reporter: Guenther Grill
>Assignee: Aljoscha Krettek
>  Labels: flink
>
> Hi,
> I followed the guide https://beam.apache.org/documentation/runners/flink/ to 
> run beam program within a flink cluster. 
> The output of the dependency-command is:
> {code}
> mvn dependency:tree -Pflink-runner |grep flink
>   
> [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime
> [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime
> [INFO]|  \- org.apache.flink:force-shading:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-annotations:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime
> [INFO]|  +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime
> [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime
> {code}
> Then I started the flink cluster with the correct version with docker-compose
> {code}
> export JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10
> docker-compose up -d
> {code}
> The compose file looks like this:
> {code}
> version: '3.3'
> services:
>   jobmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6123"
> ports:
>   - "6123:6123"
>   - "8081:8081"
> volumes:
>   - /tmp:/tmp
> command: jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
>   taskmanager:
> image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
> expose:
>   - "6121"
>   - "6122"
> depends_on:
>   - jobmanager
> command: taskmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=[HOST_IP]
> {code}
> The flink cluster works, but when I execute 
> {code}
> mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
> -Pflink-runner \
> -Dexec.args="--runner=FlinkRunner \
>   --inputFile=pom.xml \
>   --output=/path/to/counts \
>   --flinkMaster=[HOST_IP]:6123 \
>   --filesToStage=target/word-count-beam-bundled-0.1.jar"
> {code}
> I get:
> {code}
> 2017-09-12 06:39:57,226 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Submitting job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199).
> 2017-09-12 06:39:57,227 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Using restart strategy NoRestartStrategy for 
> a913f922506053e65e732eeb8336b3bd.
> 2017-09-12 06:39:57,227 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers 
> via failover strategy: full graph restart
> 2017-09-12 06:39:57,229 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Running initialization on master for job 
> wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd).
> 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager  
>   - Failed to submit job a913f922506053e65e732eeb8336b3bd 
> (wordcount-grg-0912063956-c7ea6199)
> org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 
> 'DataSource (at Read(CompressedSource) 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': 
> Deserializing the InputFormat 
> (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a)
>  failed: Could not read the user code wrapper: 
> org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
>   at 
> 

[jira] [Commented] (BEAM-2948) Unable to find registrar when restoring flink job from savepoint

2017-09-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166124#comment-16166124
 ] 

Aljoscha Krettek commented on BEAM-2948:


I think the problem is that Beam will only "instantiate" the Hadoop options 
that are passed via pipeline options at a point when it is too late (This is 
done in {{FileSystem.setDefaultPipelineOptions()}}).

The {{FileBasedSink}} uses a {{GroupByKey}} internally, which requires storing 
a {{FileResult}} in state. Decoding this (using {{FileResultCoder}}) will try 
and resolve the Hadoop FileSystem. When using the Heap state backend (or 
{{FsStateBackend}}) in Flink, state is eagerly deserialised when the 
pipeline/operator is started. 

I see some possible solutions:
 1. Use the {{RocksDBStateBackend}} which deserialises state lazily so decoding 
should happen after the Hadoop options have been instantiated
 2. We have to change {{FileResultCoder}} to not try and eagerly get a Hadoop 
Filesystem but instead do this lazily when required.

> Unable to find registrar when restoring flink job from savepoint
> 
>
> Key: BEAM-2948
> URL: https://issues.apache.org/jira/browse/BEAM-2948
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
> Environment: Flink v1.2.1 job on a EMR cluster using Beam v2.0.0
>Reporter: Luke Cwik
>Assignee: Aljoscha Krettek
>Priority: Minor
>
> Reported: 
> https://lists.apache.org/thread.html/d113707e84d7562e6d0d891830a8d85d76a497435105fe3ed1e06e13@%3Cdev.beam.apache.org%3E
> When I try to restore job from savepoint on one task manager I get the
> exception *Unable to find registrar for s3n*.
> The job can write files to s3 acting as a sink. So S3 access works except
> when restoring from savepoint.
> I am passing the following configuration as the  pipeline
> HadoopFileSystemOptions
> options:
> Configuration configuration = new Configuration();
> configuration.set("fs.defaultFS", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.default.name", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
> configuration.set("fs.s3.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
> configuration.set("fs.s3n.awsAccessKeyId",
> jobOptions.getAwsAccessKey());
> configuration.set("fs.s3n.awsSecretAccessKey",
> jobOptions.getAwsSecretKey());
> From my investigation it looks like Beam FileSystems.
> setDefaultPipelineOptions method is not called before
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder relay on initialised
> FileSystems.SCHEME_TO_FILESYSTEM map
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Unable to find registrar for s3n
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1059)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1020)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
> ... 6 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3846: JStorm-runner: Remove unnecessary WARN log which mi...

2017-09-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3846


---


[2/2] beam git commit: This closes #3846

2017-09-14 Thread pei
This closes #3846


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd9c548b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd9c548b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd9c548b

Branch: refs/heads/jstorm-runner
Commit: cd9c548b6e1ee57f3a202da64e5e55bc6704b67f
Parents: ef70031 03cc311
Author: Pei He 
Authored: Thu Sep 14 19:29:31 2017 +0800
Committer: Pei He 
Committed: Thu Sep 14 19:29:31 2017 +0800

--
 .../beam/runners/jstorm/translation/TranslatorRegistry.java| 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--




[1/2] beam git commit: JStorm-runner: Remove unnecessary WARN log, which might case confusion.

2017-09-14 Thread pei
Repository: beam
Updated Branches:
  refs/heads/jstorm-runner ef70031b7 -> cd9c548b6


JStorm-runner: Remove unnecessary WARN log, which might case confusion.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03cc311c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03cc311c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03cc311c

Branch: refs/heads/jstorm-runner
Commit: 03cc311cfbabd92390d7a848a135b59d9d80530c
Parents: ef70031
Author: basti.lj 
Authored: Wed Sep 13 16:17:14 2017 +0800
Committer: basti.lj 
Committed: Wed Sep 13 16:17:58 2017 +0800

--
 .../beam/runners/jstorm/translation/TranslatorRegistry.java| 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/03cc311c/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
--
diff --git 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
index c8ea545..f297dc3 100644
--- 
a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
+++ 
b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java
@@ -48,10 +48,6 @@ class TranslatorRegistry {
   }
 
   public static TransformTranslator getTranslator(PTransform 
transform) {
-TransformTranslator translator = TRANSLATORS.get(transform.getClass());
-if (translator == null) {
-  LOG.warn("Unsupported operator={}", transform.getClass().getName());
-}
-return translator;
+return TRANSLATORS.get(transform.getClass());
   }
 }



[jira] [Commented] (BEAM-2467) KinesisIO watermark based on approximateArrivalTimestamp

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166076#comment-16166076
 ] 

ASF GitHub Bot commented on BEAM-2467:
--

GitHub user pawel-kaczmarczyk opened a pull request:

https://github.com/apache/beam/pull/3851

[BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [X] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [X] Each commit in the pull request should have a meaningful subject 
line and body.
 - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [X] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [X] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ocadotechnology/incubator-beam 
kinesis_watermark

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3851.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3851


commit 7757ca61fb4b392469ebfd0d378cd7a61f46ae48
Author: Pawel Kaczmarczyk 
Date:   2017-09-14T10:52:16Z

[BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp




> KinesisIO watermark based on approximateArrivalTimestamp
> 
>
> Key: BEAM-2467
> URL: https://issues.apache.org/jira/browse/BEAM-2467
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-extensions
>Reporter: Paweł Kaczmarczyk
>Assignee: Paweł Kaczmarczyk
>
> In Kinesis we can start reading the stream at some point in the past during 
> the retention period (up to 7 days). With current approach for setting 
> record's timestamp and watermark (both are always set to current time, i.e. 
> Instant.now()), we can't observe the actual position in the stream.
> So the idea is to change this behaviour and set the record timestamp based on 
> the 
> [ApproximateArrivalTimestamp|http://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp].
>  Watermark will be set accordingly to the last read record's timestamp. 
> ApproximateArrivalTimestamp is still some approximation and may result in 
> having records with out-of-order timestamp's which in turn may result in some 
> events marked as late. This however should not be a frequent issue and even 
> if it happens it should be a matter of milliseconds or seconds so can be 
> handled even with a tiny allowedLateness setting



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3851: [BEAM-2467] Kinesis source watermark based on appro...

2017-09-14 Thread pawel-kaczmarczyk
GitHub user pawel-kaczmarczyk opened a pull request:

https://github.com/apache/beam/pull/3851

[BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [X] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [X] Each commit in the pull request should have a meaningful subject 
line and body.
 - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [X] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [X] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ocadotechnology/incubator-beam 
kinesis_watermark

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3851.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3851


commit 7757ca61fb4b392469ebfd0d378cd7a61f46ae48
Author: Pawel Kaczmarczyk 
Date:   2017-09-14T10:52:16Z

[BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp




---


[jira] [Commented] (BEAM-2790) Error while reading from Amazon S3 via Hadoop File System

2017-09-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165999#comment-16165999
 ] 

ASF GitHub Bot commented on BEAM-2790:
--

GitHub user echauchot opened a pull request:

https://github.com/apache/beam/pull/3850

[BEAM-2790] Use byte[] instead of ByteBuffer to read from HadoopFilesystem

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [X] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [X] Each commit in the pull request should have a meaningful subject 
line and body.
 - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [X] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [X] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [X] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---
This is the continuation of PR https://github.com/apache/beam/pull/3744. It 
fixes the code to deal with the offsets/limits to have the same behavior than 
read(ByteBuffer) and it adds corner cases tests.
R: @lukecwik @steveloughran
CC: @iemejia 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/echauchot/beam 
byte_array_instead_of_ByBuffer_HadoopFS2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3850.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3850


commit ed5ba0c2ecc6e3a0b305cb9928d610f8ab58acbe
Author: Etienne Chauchot 
Date:   2017-09-14T08:28:12Z

[BEAM-2790] Use byte[] instead of ByteBuffer to read from HadoopFilesystem




> Error while reading from Amazon S3 via Hadoop File System
> -
>
> Key: BEAM-2790
> URL: https://issues.apache.org/jira/browse/BEAM-2790
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-java-extensions
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Ismaël Mejía
>Assignee: Etienne Chauchot
> Fix For: 2.2.0
>
>
> If you try to use hadoop-aws with Beam to read from AWS S3 it breaks because 
> S3AInputStream (the implementation of Hadoop's FSDataInputStream) is not 
> ByteBufferReadable. 
> {code}
> Exception in thread "main" java.lang.UnsupportedOperationException: 
> Byte-buffer read unsupported by input stream
>   at 
> org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:146)
>   at 
> org.apache.beam.sdk.io.hdfs.HadoopFileSystem$HadoopSeekableByteChannel.read(HadoopFileSystem.java:192)
>   at 
> org.apache.beam.sdk.io.TextSource$TextBasedReader.tryToEnsureNumberOfBytesInBuffer(TextSource.java:232)
>   at 
> org.apache.beam.sdk.io.TextSource$TextBasedReader.findSeparatorBounds(TextSource.java:166)
>   at 
> org.apache.beam.sdk.io.TextSource$TextBasedReader.readNextRecord(TextSource.java:198)
>   at 
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:481)
>   at 
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476)
>   at 
> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:261)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] beam pull request #3850: [BEAM-2790] Use byte[] instead of ByteBuffer to rea...

2017-09-14 Thread echauchot
GitHub user echauchot opened a pull request:

https://github.com/apache/beam/pull/3850

[BEAM-2790] Use byte[] instead of ByteBuffer to read from HadoopFilesystem

Follow this checklist to help us incorporate your contribution quickly and 
easily:

 - [X] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
 - [X] Each commit in the pull request should have a meaningful subject 
line and body.
 - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
 - [X] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
 - [X] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
 - [X] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).

---
This is the continuation of PR https://github.com/apache/beam/pull/3744. It 
fixes the code to deal with the offsets/limits to have the same behavior than 
read(ByteBuffer) and it adds corner cases tests.
R: @lukecwik @steveloughran
CC: @iemejia 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/echauchot/beam 
byte_array_instead_of_ByBuffer_HadoopFS2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/beam/pull/3850.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3850


commit ed5ba0c2ecc6e3a0b305cb9928d610f8ab58acbe
Author: Etienne Chauchot 
Date:   2017-09-14T08:28:12Z

[BEAM-2790] Use byte[] instead of ByteBuffer to read from HadoopFilesystem




---


[GitHub] beam pull request #3841: flink-runner: constructs exception string only when...

2017-09-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/beam/pull/3841


---


[2/2] beam git commit: This closes #3841

2017-09-14 Thread pei
This closes #3841


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa4ecea2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa4ecea2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa4ecea2

Branch: refs/heads/master
Commit: fa4ecea26713244a83521ce1ca2864f4ad4409c8
Parents: 50532f0 31f51d2
Author: Pei He 
Authored: Thu Sep 14 17:21:09 2017 +0800
Committer: Pei He 
Committed: Thu Sep 14 17:21:09 2017 +0800

--
 .../translation/functions/FlinkAssignContext.java  | 17 -
 1 file changed, 8 insertions(+), 9 deletions(-)
--




[1/2] beam git commit: flink-runner: constructs exception string only when neccessary, it reduces per-element expensive calls(String.format and getSimpleName) in FlinkAssignContext.

2017-09-14 Thread pei
Repository: beam
Updated Branches:
  refs/heads/master 50532f0a9 -> fa4ecea26


flink-runner: constructs exception string only when neccessary, it reduces 
per-element expensive calls(String.format and getSimpleName) in 
FlinkAssignContext.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31f51d28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31f51d28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31f51d28

Branch: refs/heads/master
Commit: 31f51d28c574ea1792312a528b25793230787486
Parents: 50532f0
Author: Pei He 
Authored: Tue Sep 12 17:26:28 2017 +0800
Committer: Pei He 
Committed: Thu Sep 14 17:20:54 2017 +0800

--
 .../translation/functions/FlinkAssignContext.java  | 17 -
 1 file changed, 8 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/31f51d28/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
--
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
index 447b1e5..26d6721 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.common.collect.Iterables;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -35,13 +33,14 @@ class FlinkAssignContext
 
   FlinkAssignContext(WindowFn fn, WindowedValue value) {
 fn.super();
-checkArgument(
-Iterables.size(value.getWindows()) == 1,
-String.format(
-"%s passed to window assignment must be in a single window, but it 
was in %s: %s",
-WindowedValue.class.getSimpleName(),
-Iterables.size(value.getWindows()),
-value.getWindows()));
+if (Iterables.size(value.getWindows()) != 1) {
+  throw new IllegalArgumentException(
+  String.format(
+  "%s passed to window assignment must be in a single window, but 
it was in %s: %s",
+  WindowedValue.class.getSimpleName(),
+  Iterables.size(value.getWindows()),
+  value.getWindows()));
+}
 this.value = value;
   }