[GitHub] beam pull request #3854: Speeds up CompressedSourceTest
GitHub user jkff opened a pull request: https://github.com/apache/beam/pull/3854 Speeds up CompressedSourceTest This test doesn't need a TestPipeline. This makes the test at least 10x faster and less flaky. R: @lukecwik You can merge this pull request into a Git repository by running: $ git pull https://github.com/jkff/incubator-beam compressed-source Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3854.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3854 commit 952a1c77c6cbec79c2a131812e2a3d144b906d6a Author: Eugene KirpichovDate: 2017-09-15T00:51:28Z Speeds up CompressedSourceTest ---
[GitHub] beam pull request #3703: [BEAM-1637] Create Elasticsearch IO compatible with...
Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/3703 ---
[jira] [Commented] (BEAM-1637) Create Elasticsearch IO compatible with ES 5.x
[ https://issues.apache.org/jira/browse/BEAM-1637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16167154#comment-16167154 ] ASF GitHub Bot commented on BEAM-1637: -- Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/3703 > Create Elasticsearch IO compatible with ES 5.x > -- > > Key: BEAM-1637 > URL: https://issues.apache.org/jira/browse/BEAM-1637 > Project: Beam > Issue Type: New Feature > Components: sdk-java-extensions >Reporter: Etienne Chauchot >Assignee: Etienne Chauchot >Priority: Minor > > The current Elasticsearch IO (see > https://issues.apache.org/jira/browse/BEAM-425) is only compatible with > Elasticsearch v 2.x. The aim is to have an IO compatible with ES v 5.x. > Beyond being able to address v5.x elasticsearch instances, we could also > leverage the use of the Elasticsearch pipeline API and also better split the > dataset (be as close as possible of desiredBundleSize) thanks to the new ES > split API that allows ES shards splitting. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[3/4] beam git commit: Piggyback: deflakes MongoDBGridFSIOTest which failed during merge
Piggyback: deflakes MongoDBGridFSIOTest which failed during merge Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6c21cab5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6c21cab5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6c21cab5 Branch: refs/heads/master Commit: 6c21cab54c22b8ed4d6ccbcd20c27cdb64277b7c Parents: 0d4fd19 Author: Eugene KirpichovAuthored: Thu Sep 14 17:22:33 2017 -0700 Committer: Eugene Kirpichov Committed: Thu Sep 14 17:22:33 2017 -0700 -- .../java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/6c21cab5/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java -- diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index 826af1c..19f8d87 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -233,7 +233,7 @@ public class MongoDBGridFSIOTest implements Serializable { } } }) -.withSkew(new Duration(3601000L)) +.withSkew(new Duration(361L)) .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of(; PAssert.thatSingleton(output.apply("Count All", Count. >globally()))
[4/4] beam git commit: This closes #3703: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x
This closes #3703: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/307b0368 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/307b0368 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/307b0368 Branch: refs/heads/master Commit: 307b03680ba8b6278d80a07b1ce73421804aa994 Parents: e7601aa 6c21cab Author: Eugene KirpichovAuthored: Thu Sep 14 17:24:49 2017 -0700 Committer: Eugene Kirpichov Committed: Thu Sep 14 17:24:49 2017 -0700 -- pom.xml | 22 ++ .../elasticsearch-tests-2/pom.xml | 60 .../src/test/contrib/create_elk_container.sh| 24 ++ .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 123 +++ .../io/elasticsearch/ElasticsearchIOTest.java | 185 ++ .../elasticsearch-tests-5/pom.xml | 126 +++ .../src/test/contrib/create_elk_container.sh| 24 ++ .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 122 +++ .../io/elasticsearch/ElasticsearchIOTest.java | 184 ++ .../org/elasticsearch/bootstrap/JarHell.java| 39 ++ .../elasticsearch-tests-common/pom.xml | 81 + .../elasticsearch/ElasticSearchIOTestUtils.java | 142 .../elasticsearch/ElasticsearchIOITCommon.java | 92 + .../ElasticsearchIOTestCommon.java | 306 sdks/java/io/elasticsearch-tests/pom.xml| 144 sdks/java/io/elasticsearch/pom.xml | 234 +--- .../sdk/io/elasticsearch/ElasticsearchIO.java | 229 +++- .../beam/sdk/io/elasticsearch/package-info.java | 1 - .../src/test/contrib/create_elk_container.sh| 24 -- .../elasticsearch/ElasticSearchIOTestUtils.java | 138 --- .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 155 .../io/elasticsearch/ElasticsearchIOTest.java | 355 --- .../elasticsearch/ElasticsearchTestDataSet.java | 97 - .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 2 +- sdks/java/io/pom.xml| 1 + sdks/java/javadoc/pom.xml | 10 + 26 files changed, 1926 insertions(+), 994 deletions(-) --
[1/4] beam git commit: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x
Repository: beam Updated Branches: refs/heads/master e7601aac3 -> 307b03680 http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java -- diff --git a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java index 5041eec..5eebe00 100644 --- a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java +++ b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java @@ -30,6 +30,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.security.KeyStore; import java.util.ArrayList; import java.util.Arrays; @@ -63,7 +64,6 @@ import org.apache.http.conn.ssl.TrustSelfSignedStrategy; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; -import org.apache.http.message.BasicHeader; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.ssl.SSLContexts; @@ -73,7 +73,6 @@ import org.elasticsearch.client.RestClientBuilder; /** * Transforms for reading and writing data from/to Elasticsearch. - * This IO is only compatible with Elasticsearch v2.x * * Reading from Elasticsearch * @@ -145,6 +144,7 @@ public class ElasticsearchIO { private static final ObjectMapper mapper = new ObjectMapper(); + @VisibleForTesting static JsonNode parseResponse(Response response) throws IOException { return mapper.readValue(response.getEntity().getContent(), JsonNode.class); } @@ -153,23 +153,23 @@ public class ElasticsearchIO { @AutoValue public abstract static class ConnectionConfiguration implements Serializable { -abstract List getAddresses(); +public abstract List getAddresses(); @Nullable -abstract String getUsername(); +public abstract String getUsername(); @Nullable -abstract String getPassword(); +public abstract String getPassword(); @Nullable -abstract String getKeystorePath(); +public abstract String getKeystorePath(); @Nullable -abstract String getKeystorePassword(); +public abstract String getKeystorePassword(); -abstract String getIndex(); +public abstract String getIndex(); -abstract String getType(); +public abstract String getType(); abstract Builder builder(); @@ -267,6 +267,7 @@ public class ElasticsearchIO { builder.addIfNotNull(DisplayData.item("keystore.path", getKeystorePath())); } +@VisibleForTesting RestClient createClient() throws IOException { HttpHost[] hosts = new HttpHost[getAddresses().size()]; int i = 0; @@ -399,9 +400,8 @@ public class ElasticsearchIO { checkState( connectionConfiguration != null, "withConnectionConfiguration() is required"); - checkVersion(connectionConfiguration); - return input.apply( - org.apache.beam.sdk.io.Read.from(new BoundedElasticsearchSource(this, null))); + return input.apply(org.apache.beam.sdk.io.Read + .from(new BoundedElasticsearchSource(this, null, null, null))); } @Override @@ -416,55 +416,94 @@ public class ElasticsearchIO { /** A {@link BoundedSource} reading from Elasticsearch. */ @VisibleForTesting - static class BoundedElasticsearchSource extends BoundedSource { + public static class BoundedElasticsearchSource extends BoundedSource { -private final ElasticsearchIO.Read spec; -// shardPreference is the shard number where the source will read the documents -@Nullable private final String shardPreference; +private int backendVersion; -BoundedElasticsearchSource(Read spec, @Nullable String shardPreference) { +private final Read spec; +// shardPreference is the shard id where the source will read the documents +@Nullable +private final String shardPreference; +@Nullable +private final Integer numSlices; +@Nullable +private final Integer sliceId; + +//constructor used in split() when we know the backend version +private BoundedElasticsearchSource(Read spec, @Nullable String shardPreference, +@Nullable Integer numSlices, @Nullable Integer sliceId, int backendVersion) { + this.backendVersion = backendVersion; this.spec = spec; this.shardPreference = shardPreference; + this.numSlices = numSlices; + this.sliceId = sliceId; } +@VisibleForTesting +BoundedElasticsearchSource(Read spec, @Nullable String
[2/4] beam git commit: [BEAM-1637] Create Elasticsearch IO compatible with ES 5.x
[BEAM-1637] Create Elasticsearch IO compatible with ES 5.x Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0d4fd190 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0d4fd190 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0d4fd190 Branch: refs/heads/master Commit: 0d4fd19076722515f29c34144cc93aab3795801f Parents: e7601aa Author: Etienne ChauchotAuthored: Mon Jun 26 10:58:21 2017 +0200 Committer: Eugene Kirpichov Committed: Thu Sep 14 17:01:27 2017 -0700 -- pom.xml | 22 ++ .../elasticsearch-tests-2/pom.xml | 60 .../src/test/contrib/create_elk_container.sh| 24 ++ .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 123 +++ .../io/elasticsearch/ElasticsearchIOTest.java | 185 ++ .../elasticsearch-tests-5/pom.xml | 126 +++ .../src/test/contrib/create_elk_container.sh| 24 ++ .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 122 +++ .../io/elasticsearch/ElasticsearchIOTest.java | 184 ++ .../org/elasticsearch/bootstrap/JarHell.java| 39 ++ .../elasticsearch-tests-common/pom.xml | 81 + .../elasticsearch/ElasticSearchIOTestUtils.java | 142 .../elasticsearch/ElasticsearchIOITCommon.java | 92 + .../ElasticsearchIOTestCommon.java | 306 sdks/java/io/elasticsearch-tests/pom.xml| 144 sdks/java/io/elasticsearch/pom.xml | 234 +--- .../sdk/io/elasticsearch/ElasticsearchIO.java | 229 +++- .../beam/sdk/io/elasticsearch/package-info.java | 1 - .../src/test/contrib/create_elk_container.sh| 24 -- .../elasticsearch/ElasticSearchIOTestUtils.java | 138 --- .../sdk/io/elasticsearch/ElasticsearchIOIT.java | 155 .../io/elasticsearch/ElasticsearchIOTest.java | 355 --- .../elasticsearch/ElasticsearchTestDataSet.java | 97 - sdks/java/io/pom.xml| 1 + sdks/java/javadoc/pom.xml | 10 + 25 files changed, 1925 insertions(+), 993 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/pom.xml -- diff --git a/pom.xml b/pom.xml index f9644dd..a2d6aae 100644 --- a/pom.xml +++ b/pom.xml @@ -462,6 +462,28 @@ org.apache.beam +beam-sdks-java-io-elasticsearch-tests-common +${project.version} +test +tests + + + +org.apache.beam +beam-sdks-java-io-elasticsearch-tests-2 +${project.version} +test + + + +org.apache.beam +beam-sdks-java-io-elasticsearch-tests-5 +${project.version} +test + + + +org.apache.beam beam-sdks-java-io-google-cloud-platform ${project.version} http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml -- diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml new file mode 100644 index 000..a56ffa4 --- /dev/null +++ b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/pom.xml @@ -0,0 +1,60 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + 4.0.0 + + +org.apache.beam +beam-sdks-java-io-elasticsearch-tests-parent +2.2.0-SNAPSHOT +../pom.xml + + + beam-sdks-java-io-elasticsearch-tests-2 + Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 2.x + Tests of ElasticsearchIO on Elasticsearch 2.x + + +2.4.1 + + + + + + com.google.guava + guava + test + + + + org.apache.beam + beam-sdks-java-io-elasticsearch-tests-common + test + tests + + + + org.elasticsearch + elasticsearch + ${elasticsearch.version} + test + + + + http://git-wip-us.apache.org/repos/asf/beam/blob/0d4fd190/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh -- diff --git a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/contrib/create_elk_container.sh new file mode 100755 index 000..48f6064 --- /dev/null +++
[jira] [Assigned] (BEAM-2834) NullPointerException @ BigQueryServicesImpl.java:759
[ https://issues.apache.org/jira/browse/BEAM-2834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eugene Kirpichov reassigned BEAM-2834: -- Assignee: Reuven Lax (was: Thomas Groh) > NullPointerException @ BigQueryServicesImpl.java:759 > > > Key: BEAM-2834 > URL: https://issues.apache.org/jira/browse/BEAM-2834 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Affects Versions: 2.1.0 >Reporter: Andy Barron >Assignee: Reuven Lax > Fix For: 2.2.0 > > > {code} > java.lang.NullPointerException > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:759) > at > org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:809) > at > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:126) > at > org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:96) > {code} > Going through the stack trace, the likely culprit is a null {{retryPolicy}} > in {{StreamingWriteFn}}. > For context, this error showed up about 70 times between 1 am and 1 pm > Pacific time (2017-08-31) on a Dataflow streaming job. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery
[ https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chamikara Jayalath updated BEAM-2879: - Affects Version/s: Not applicable > Implement and use an Avro coder rather than the JSON one for intermediary > files to be loaded in BigQuery > > > Key: BEAM-2879 > URL: https://issues.apache.org/jira/browse/BEAM-2879 > Project: Beam > Issue Type: Improvement > Components: sdk-java-gcp >Reporter: Black Phoenix >Priority: Minor > Labels: starter > > Before being loaded in BigQuery, temporary files are created and encoded in > JSON. Which is a costly solution compared to an Avro alternative -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery
[ https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chamikara Jayalath updated BEAM-2879: - Affects Version/s: (was: 2.1.0) > Implement and use an Avro coder rather than the JSON one for intermediary > files to be loaded in BigQuery > > > Key: BEAM-2879 > URL: https://issues.apache.org/jira/browse/BEAM-2879 > Project: Beam > Issue Type: Improvement > Components: sdk-java-gcp >Reporter: Black Phoenix >Priority: Minor > Labels: starter > > Before being loaded in BigQuery, temporary files are created and encoded in > JSON. Which is a costly solution compared to an Avro alternative -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery
[ https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chamikara Jayalath updated BEAM-2879: - Affects Version/s: (was: Not applicable) > Implement and use an Avro coder rather than the JSON one for intermediary > files to be loaded in BigQuery > > > Key: BEAM-2879 > URL: https://issues.apache.org/jira/browse/BEAM-2879 > Project: Beam > Issue Type: Improvement > Components: sdk-java-gcp >Reporter: Black Phoenix >Priority: Minor > Labels: starter > > Before being loaded in BigQuery, temporary files are created and encoded in > JSON. Which is a costly solution compared to an Avro alternative -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2857) Create FileIO in Python
[ https://issues.apache.org/jira/browse/BEAM-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chamikara Jayalath updated BEAM-2857: - Labels: starter (was: ) > Create FileIO in Python > --- > > Key: BEAM-2857 > URL: https://issues.apache.org/jira/browse/BEAM-2857 > Project: Beam > Issue Type: New Feature > Components: sdk-py >Reporter: Eugene Kirpichov > Labels: starter > > Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), > which together cover the majority of needs for general-purpose file > ingestion. Beam Python should have something similar. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-2857) Create FileIO in Python
[ https://issues.apache.org/jira/browse/BEAM-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chamikara Jayalath reassigned BEAM-2857: Assignee: (was: Ahmet Altay) > Create FileIO in Python > --- > > Key: BEAM-2857 > URL: https://issues.apache.org/jira/browse/BEAM-2857 > Project: Beam > Issue Type: New Feature > Components: sdk-py >Reporter: Eugene Kirpichov > Labels: starter > > Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), > which together cover the majority of needs for general-purpose file > ingestion. Beam Python should have something similar. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-2857) Create FileIO in Python
[ https://issues.apache.org/jira/browse/BEAM-2857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chamikara Jayalath reassigned BEAM-2857: Assignee: Ahmet Altay (was: Chamikara Jayalath) > Create FileIO in Python > --- > > Key: BEAM-2857 > URL: https://issues.apache.org/jira/browse/BEAM-2857 > Project: Beam > Issue Type: New Feature > Components: sdk-py >Reporter: Eugene Kirpichov >Assignee: Ahmet Altay > Labels: starter > > Beam Java has a FileIO with operations: match()/matchAll(), readMatches(), > which together cover the majority of needs for general-purpose file > ingestion. Beam Python should have something similar. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery
[ https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chamikara Jayalath updated BEAM-2879: - Labels: starter (was: ) > Implement and use an Avro coder rather than the JSON one for intermediary > files to be loaded in BigQuery > > > Key: BEAM-2879 > URL: https://issues.apache.org/jira/browse/BEAM-2879 > Project: Beam > Issue Type: Improvement > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Black Phoenix >Priority: Minor > Labels: starter > > Before being loaded in BigQuery, temporary files are created and encoded in > JSON. Which is a costly solution compared to an Avro alternative -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery
[ https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chamikara Jayalath reassigned BEAM-2879: Assignee: (was: Chamikara Jayalath) > Implement and use an Avro coder rather than the JSON one for intermediary > files to be loaded in BigQuery > > > Key: BEAM-2879 > URL: https://issues.apache.org/jira/browse/BEAM-2879 > Project: Beam > Issue Type: Improvement > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Black Phoenix >Priority: Minor > Labels: starter > > Before being loaded in BigQuery, temporary files are created and encoded in > JSON. Which is a costly solution compared to an Avro alternative -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2879) Implement and use an Avro coder rather than the JSON one for intermediary files to be loaded in BigQuery
[ https://issues.apache.org/jira/browse/BEAM-2879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166960#comment-16166960 ] Chamikara Jayalath commented on BEAM-2879: -- Thanks for creating this. Looks like this is a low hanging fruit to improve performance. [~kirpichov] [~reuvenlax] is there any reason why we didn't do this so far ? > Implement and use an Avro coder rather than the JSON one for intermediary > files to be loaded in BigQuery > > > Key: BEAM-2879 > URL: https://issues.apache.org/jira/browse/BEAM-2879 > Project: Beam > Issue Type: Improvement > Components: sdk-java-gcp >Affects Versions: 2.1.0 >Reporter: Black Phoenix >Assignee: Chamikara Jayalath >Priority: Minor > Labels: starter > > Before being loaded in BigQuery, temporary files are created and encoded in > JSON. Which is a costly solution compared to an Avro alternative -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-2808) Improve error message when DoFn @ProcessElement has the wrong window type
[ https://issues.apache.org/jira/browse/BEAM-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Cwik reassigned BEAM-2808: --- Assignee: Daniel Oliveira (was: Luke Cwik) > Improve error message when DoFn @ProcessElement has the wrong window type > - > > Key: BEAM-2808 > URL: https://issues.apache.org/jira/browse/BEAM-2808 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Daniel Oliveira >Priority: Minor > Labels: starter > > The message today is something like this: > {code} > processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow) > expects window type org.apache.beam.sdk.transforms.windowing.IntervalWindow, > which is not a supertype of actual window type > org.apache.beam.sdk.transforms.windowing.GlobalWindow > {code} > Someone suggested this: > {code} > processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow) > unable to provide window -- expects window type > org.apache.beam.sdk.transforms.windowing.IntervalWindow (from parameter), > which is not a supertype of actual window type > org.apache.beam.sdk.transforms.windowing.GlobalWindow (assigned by windowing) > {code} > If anyone wants to pick this up and wordsmith it more, strip some of the > namespaces (where reasonable), etc, that would be great. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (BEAM-2808) Improve error message when DoFn @ProcessElement has the wrong window type
[ https://issues.apache.org/jira/browse/BEAM-2808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Cwik reassigned BEAM-2808: --- Assignee: Luke Cwik > Improve error message when DoFn @ProcessElement has the wrong window type > - > > Key: BEAM-2808 > URL: https://issues.apache.org/jira/browse/BEAM-2808 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Luke Cwik >Priority: Minor > Labels: starter > > The message today is something like this: > {code} > processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow) > expects window type org.apache.beam.sdk.transforms.windowing.IntervalWindow, > which is not a supertype of actual window type > org.apache.beam.sdk.transforms.windowing.GlobalWindow > {code} > Someone suggested this: > {code} > processElement(org.apache.beam.sdk.transforms.DoFn$ProcessContext,org.apache.beam.sdk.transforms.windowing.IntervalWindow) > unable to provide window -- expects window type > org.apache.beam.sdk.transforms.windowing.IntervalWindow (from parameter), > which is not a supertype of actual window type > org.apache.beam.sdk.transforms.windowing.GlobalWindow (assigned by windowing) > {code} > If anyone wants to pick this up and wordsmith it more, strip some of the > namespaces (where reasonable), etc, that would be great. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem
[ https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166841#comment-16166841 ] Jacob Marble commented on BEAM-2500: Luke, thanks! I may have found what you're referencing in AbstractGoogleAsyncWriteChannel.uploadBufferSize. Default is 64MB, or 8MB: GCS_UPLOAD_GRANULARITY = 8 * 1024 * 1024; UPLOAD_CHUNK_SIZE_DEFAULT = Runtime.getRuntime().maxMemory() < 512 * 1024 * 1024 ? GCS_UPLOAD_GRANULARITY : 8 * GCS_UPLOAD_GRANULARITY; > Add support for S3 as a Apache Beam FileSystem > -- > > Key: BEAM-2500 > URL: https://issues.apache.org/jira/browse/BEAM-2500 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Luke Cwik >Priority: Minor > Attachments: hadoop_fs_patch.patch > > > Note that this is for providing direct integration with S3 as an Apache Beam > FileSystem. > There is already support for using the Hadoop S3 connector by depending on > the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with > a S3 configuration[3]. > 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system > 2: > https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53 > 3: https://wiki.apache.org/hadoop/AmazonS3 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem
[ https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166823#comment-16166823 ] Luke Cwik commented on BEAM-2500: - The GCS implementation uses a fixed size buffer of 32 or 64mbs (don't remember which) so buffering in memory is a common practice. 500 mbs does seem like a lot but even if you went with 32 mbs, you could support files up to ~320 gb. > Add support for S3 as a Apache Beam FileSystem > -- > > Key: BEAM-2500 > URL: https://issues.apache.org/jira/browse/BEAM-2500 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Luke Cwik >Priority: Minor > Attachments: hadoop_fs_patch.patch > > > Note that this is for providing direct integration with S3 as an Apache Beam > FileSystem. > There is already support for using the Hadoop S3 connector by depending on > the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with > a S3 configuration[3]. > 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system > 2: > https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53 > 3: https://wiki.apache.org/hadoop/AmazonS3 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem
[ https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166803#comment-16166803 ] Jacob Marble commented on BEAM-2500: Multipart upload could get us around the content length requirement, but it's awkward. An object can be 5TB, and a multipart upload can have 10,000 parts, so I could read 500MB at a time into memory, ship those chunks. Bad idea. Still can't see how Beam can indicate content length to a FileSystem sink. I'll move on to source stuff for a while. > Add support for S3 as a Apache Beam FileSystem > -- > > Key: BEAM-2500 > URL: https://issues.apache.org/jira/browse/BEAM-2500 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Luke Cwik >Priority: Minor > Attachments: hadoop_fs_patch.patch > > > Note that this is for providing direct integration with S3 as an Apache Beam > FileSystem. > There is already support for using the Hadoop S3 connector by depending on > the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with > a S3 configuration[3]. > 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system > 2: > https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53 > 3: https://wiki.apache.org/hadoop/AmazonS3 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2795) FlinkRunner: translate using SDK-agnostic means
[ https://issues.apache.org/jira/browse/BEAM-2795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166792#comment-16166792 ] ASF GitHub Bot commented on BEAM-2795: -- Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/3813 > FlinkRunner: translate using SDK-agnostic means > --- > > Key: BEAM-2795 > URL: https://issues.apache.org/jira/browse/BEAM-2795 > Project: Beam > Issue Type: Sub-task > Components: runner-flink >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles > Labels: portability > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] beam pull request #3813: [BEAM-2795] Key FlinkRunner streaming translation o...
Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/3813 ---
[2/2] beam git commit: This closes #3813: [BEAM-2795] Key FlinkRunner streaming translation off URN
This closes #3813: [BEAM-2795] Key FlinkRunner streaming translation off URN Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e7601aac Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e7601aac Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e7601aac Branch: refs/heads/master Commit: e7601aac3a1c9863ea67f305dc9e1b68041d9d8c Parents: fa4ecea be9fb29 Author: Kenneth KnowlesAuthored: Thu Sep 14 11:23:37 2017 -0700 Committer: Kenneth Knowles Committed: Thu Sep 14 11:23:37 2017 -0700 -- .../construction/PTransformTranslation.java | 27 ++- runners/flink/pom.xml | 5 + .../runners/flink/CreateStreamingFlinkView.java | 3 + .../FlinkStreamingTransformTranslators.java | 205 +-- 4 files changed, 215 insertions(+), 25 deletions(-) --
[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem
[ https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166741#comment-16166741 ] Jacob Marble commented on BEAM-2500: Chamikara, thanks for your comment. I'll switch my implementation to multipart after I have something working, just got the simple 5GB version written. I'll also give closer consideration to the credentials question after I have the harder parts complete. For now, just using flags via PipelineOptions. So I have completed enough of this to test it out, except one problem. S3 requires the content length before writing any data, or else the client buffers the entire content in memory before writing. I have added contentLength to my S3CreateOptions, but how to set that value before S3FileSystem.create() is called? > Add support for S3 as a Apache Beam FileSystem > -- > > Key: BEAM-2500 > URL: https://issues.apache.org/jira/browse/BEAM-2500 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Luke Cwik >Priority: Minor > Attachments: hadoop_fs_patch.patch > > > Note that this is for providing direct integration with S3 as an Apache Beam > FileSystem. > There is already support for using the Hadoop S3 connector by depending on > the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with > a S3 configuration[3]. > 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system > 2: > https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53 > 3: https://wiki.apache.org/hadoop/AmazonS3 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem
[ https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166735#comment-16166735 ] Luke Cwik commented on BEAM-2500: - Performing the multipart download/upload will become important as 5GiBs has limited use but start off implementing the simpler thing as multipart upload/download can come later. http://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html Amazon supports an efficient copy operation if you specify "x-amz-copy-source" as a header where you don't need to upload the bytes and it just adds some metadata that points to the same set of bytes. Depending on which Amazon S3 Java library you use, they may or may not expose this flexibility. > Add support for S3 as a Apache Beam FileSystem > -- > > Key: BEAM-2500 > URL: https://issues.apache.org/jira/browse/BEAM-2500 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Luke Cwik >Priority: Minor > Attachments: hadoop_fs_patch.patch > > > Note that this is for providing direct integration with S3 as an Apache Beam > FileSystem. > There is already support for using the Hadoop S3 connector by depending on > the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with > a S3 configuration[3]. > 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system > 2: > https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53 > 3: https://wiki.apache.org/hadoop/AmazonS3 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2699) AppliedPTransform is used as a key in hashmaps but PTransform is not hashable/equality-comparable
[ https://issues.apache.org/jira/browse/BEAM-2699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166701#comment-16166701 ] ASF GitHub Bot commented on BEAM-2699: -- GitHub user tgroh opened a pull request: https://github.com/apache/beam/pull/3853 [BEAM-2699] Update AppliedPTransform Equality Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- Use only node name and the pipeline the two transforms are in. Do not consider the reported equality of inputs, outputs, or transforms. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/beam update_application_equality Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3853.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3853 commit 6615badce6f3dd2835ac8eac94d792595e86a576 Author: Thomas GrohDate: 2017-09-14T17:50:09Z Update AppliedPTransform Equality Use only node name and the pipeline the two transforms are in. Do not consider the reported equality of inputs, outputs, or transforms. > AppliedPTransform is used as a key in hashmaps but PTransform is not > hashable/equality-comparable > - > > Key: BEAM-2699 > URL: https://issues.apache.org/jira/browse/BEAM-2699 > Project: Beam > Issue Type: Bug > Components: runner-core >Reporter: Eugene Kirpichov >Assignee: Thomas Groh > > There's plenty of occurrences in runners-core of Map or BiMap where the key > is an AppliedPTransform. > However, PTransform does not advertise that it is required to implement > equals/hashCode, and some transforms can't do it properly anyway - for > example, transforms that capture a ValueProvider which is also not > hashable/eq-comparable. I'm surprised that things aren't already very broken > because of this. > Fundamentally, I don't see why we should ever compare two PTransform's for > equality. > I looked at the code and wondered "can AppliedPTransform simply be > identity-hashable", but right now the answer is no because we can create an > AppliedPTransform for the same transform applied to the same thing multiple > times. > Fixing that appears to be not very easy, but definitely possible. Ideally > TransformHierarchy.Node would just know its AppliedPTransform, however a Node > can be constructed when there's yet no Pipeline. Suppose there's gotta be > some way to propagate a Pipeline into Node.finishSpecifying() (which should > be called exactly once on the Node, and this should be enforced), and have > finishSpecifying() return the AppliedPTransform, and have the caller use that > instead of potentially repeatedly calling .toAppliedPTransform() on the same > Node. > [~kenn] is on vacation but perhaps [~tgroh] can help with this meanwhile? > CC: [~reuvenlax] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] beam pull request #3853: [BEAM-2699] Update AppliedPTransform Equality
GitHub user tgroh opened a pull request: https://github.com/apache/beam/pull/3853 [BEAM-2699] Update AppliedPTransform Equality Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- Use only node name and the pipeline the two transforms are in. Do not consider the reported equality of inputs, outputs, or transforms. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/beam update_application_equality Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3853.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3853 commit 6615badce6f3dd2835ac8eac94d792595e86a576 Author: Thomas GrohDate: 2017-09-14T17:50:09Z Update AppliedPTransform Equality Use only node name and the pipeline the two transforms are in. Do not consider the reported equality of inputs, outputs, or transforms. ---
[jira] [Commented] (BEAM-2500) Add support for S3 as a Apache Beam FileSystem
[ https://issues.apache.org/jira/browse/BEAM-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166678#comment-16166678 ] Chamikara Jayalath commented on BEAM-2500: -- Thanks for looking into this Jacob. I'll try to answer some of your questions. I agree that GCS is a good template. It's pretty well battle tested and used by all Dataflow pipelines. 5GB limit for FileSystem.copy(), though might be a good start, might not be enough for production use. Beam can parallelize reading of a large files. So there might be many users who are used to reading a small number of large files from their pipelines. Note that we use copy() operation to finalize files written using FileBasedSink implementations. So we might need to copy large files there as well. It'll be good if copy() can be implemented using multipart upload as you mentioned. FileSystem.create() is to create an empty WrittableByteChannel that will be written to later. So we'll have to have a way to stream bytes into S3 (some implementation of WrittableByteChannel). I'm not sure if S3 client library already supports this. 's3' for schema sounds good to me. For efficient parallelized reading FileSystem.open() should return an efficiently seekable SeekableByteChannel. Using copy() + delete() combination for rename() is fine. We might have to address issues related to providing credentials for accessing S3. See following JIRA where some details related to this were discussed when [~demeshchuk] looked into adding a S3 file system for Python SDK. https://issues.apache.org/jira/browse/BEAM-2572 > Add support for S3 as a Apache Beam FileSystem > -- > > Key: BEAM-2500 > URL: https://issues.apache.org/jira/browse/BEAM-2500 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Luke Cwik >Priority: Minor > Attachments: hadoop_fs_patch.patch > > > Note that this is for providing direct integration with S3 as an Apache Beam > FileSystem. > There is already support for using the Hadoop S3 connector by depending on > the Hadoop File System module[1], configuring HadoopFileSystemOptions[2] with > a S3 configuration[3]. > 1: https://github.com/apache/beam/tree/master/sdks/java/io/hadoop-file-system > 2: > https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemOptions.java#L53 > 3: https://wiki.apache.org/hadoop/AmazonS3 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2948) Unable to find registrar when restoring flink job from savepoint
[ https://issues.apache.org/jira/browse/BEAM-2948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1616#comment-1616 ] Luke Cwik commented on BEAM-2948: - FileSystems assumes that it has been initialized early on within each "worker". If this isn't honored then we may find other areas where this is also an issue. Also, once this is executing over the portability framework, this will be a non-issue since it will all happen in the SDK harness. > Unable to find registrar when restoring flink job from savepoint > > > Key: BEAM-2948 > URL: https://issues.apache.org/jira/browse/BEAM-2948 > Project: Beam > Issue Type: Bug > Components: runner-flink > Environment: Flink v1.2.1 job on a EMR cluster using Beam v2.0.0 >Reporter: Luke Cwik >Assignee: Aljoscha Krettek >Priority: Minor > > Reported: > https://lists.apache.org/thread.html/d113707e84d7562e6d0d891830a8d85d76a497435105fe3ed1e06e13@%3Cdev.beam.apache.org%3E > When I try to restore job from savepoint on one task manager I get the > exception *Unable to find registrar for s3n*. > The job can write files to s3 acting as a sink. So S3 access works except > when restoring from savepoint. > I am passing the following configuration as the pipeline > HadoopFileSystemOptions > options: > Configuration configuration = new Configuration(); > configuration.set("fs.defaultFS", String.format("s3n://%s", > jobOptions.getOutputFileSystemRoot())); > configuration.set("fs.default.name", String.format("s3n://%s", > jobOptions.getOutputFileSystemRoot())); > configuration.set("fs.s3.impl", > "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); > configuration.set("fs.s3n.awsAccessKeyId", > jobOptions.getAwsAccessKey()); > configuration.set("fs.s3n.awsSecretAccessKey", > jobOptions.getAwsSecretKey()); > From my investigation it looks like Beam FileSystems. > setDefaultPipelineOptions method is not called before > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder relay on initialised > FileSystems.SCHEME_TO_FILESYSTEM map > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalStateException: Unable to find registrar for s3n > at > org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447) > at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523) > at > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1059) > at > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1020) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:87) > at > org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:87) > at > org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:27) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284) > ... 6 more -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2885) Support job+artifact APIs locally
[ https://issues.apache.org/jira/browse/BEAM-2885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166645#comment-16166645 ] ASF GitHub Bot commented on BEAM-2885: -- GitHub user tgroh opened a pull request: https://github.com/apache/beam/pull/3852 [BEAM-2885] Add a Local FS implementation of the Artifact Staging API Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/beam local_fs_artifact_staging_service Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3852.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3852 commit afa1a5adda499fb65176e5a5d00cd9cda89c49b6 Author: Thomas GrohDate: 2017-09-13T20:32:20Z Add a Local FS implementation of the Artifact Staging API > Support job+artifact APIs locally > - > > Key: BEAM-2885 > URL: https://issues.apache.org/jira/browse/BEAM-2885 > Project: Beam > Issue Type: Sub-task > Components: runner-dataflow >Reporter: Henning Rohde >Assignee: Thomas Groh > Labels: portability > > As per https://s.apache.org/beam-job-api, use local support for > submission-side. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] beam pull request #3852: [BEAM-2885] Add a Local FS implementation of the Ar...
GitHub user tgroh opened a pull request: https://github.com/apache/beam/pull/3852 [BEAM-2885] Add a Local FS implementation of the Artifact Staging API Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/tgroh/beam local_fs_artifact_staging_service Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3852.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3852 commit afa1a5adda499fb65176e5a5d00cd9cda89c49b6 Author: Thomas GrohDate: 2017-09-13T20:32:20Z Add a Local FS implementation of the Artifact Staging API ---
[jira] [Commented] (BEAM-2955) Create a Cloud Bigtable HBase connector
[ https://issues.apache.org/jira/browse/BEAM-2955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166508#comment-16166508 ] Chamikara Jayalath commented on BEAM-2955: -- I'm fine with option (2) as well. Extra ~300 lines of code shouldn't be much of an issue. I think we should take the option that makes more sense performance and usability wise. > Create a Cloud Bigtable HBase connector > --- > > Key: BEAM-2955 > URL: https://issues.apache.org/jira/browse/BEAM-2955 > Project: Beam > Issue Type: New Feature > Components: sdk-java-gcp >Reporter: Solomon Duskis >Assignee: Solomon Duskis > > The Cloud Bigtable (CBT) team has had a Dataflow connector maintained in a > different repo for awhile. Recently, we did some reworking of the Cloud > Bigtable client that would allow it to better coexist in the Beam ecosystem, > and we also released a Beam connector in our repository that exposes HBase > idioms rather than the Protobuf idioms of BigtableIO. More information about > the customer experience of the HBase connector can be found here: > [https://cloud.google.com/bigtable/docs/dataflow-hbase]. > The Beam repo is a much better place to house a Cloud Bigtable HBase > connector. There are a couple of ways we can implement this new connector: > # The CBT connector depends on artifacts in the io/hbase maven project. We > can create a new extend HBaseIO for the purposes of CBT. We would have to > add some features to HBaseIO to make that work (dynamic rebalancing, and a > way for HBase and CBT's size estimation models to coexist) > # The BigtableIO connector works well, and we can add an adapter layer on top > of it. I have a proof of concept of it here: > [https://github.com/sduskis/cloud-bigtable-client/tree/add_beam/bigtable-dataflow-parent/bigtable-hbase-beam]. > # We can build a separate CBT HBase connector. > I'm happy to do the work. I would appreciate some guidance and discussion > about the right approach. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException
[ https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166404#comment-16166404 ] Guenther Grill commented on BEAM-2943: -- I double checked it, by looking into the `target/word-count-beam-bundled-0.1.jar` file. The class is there. > Beam Flink deployment results in ClassNotFoundException > --- > > Key: BEAM-2943 > URL: https://issues.apache.org/jira/browse/BEAM-2943 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0 > Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 > (2017-08-06) x86_64 GNU/Linux >Reporter: Guenther Grill >Assignee: Aljoscha Krettek > Labels: flink > > Hi, > I followed the guide https://beam.apache.org/get-started/quickstart-java/ to > run beam program within a flink cluster. > The output of the dependency-command is: > {code} > mvn dependency:tree -Pflink-runner |grep flink > > [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime > [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime > [INFO]| \- org.apache.flink:force-shading:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-annotations:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime > {code} > Then I started the flink cluster with the correct version with docker-compose > {code} > export JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10 > docker-compose up -d > {code} > The compose file looks like this: > {code} > version: '3.3' > services: > jobmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6123" > ports: > - "6123:6123" > - "8081:8081" > volumes: > - /tmp:/tmp > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > taskmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > {code} > The flink cluster works, but when I execute > {code} > mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > -Pflink-runner \ > -Dexec.args="--runner=FlinkRunner \ > --inputFile=pom.xml \ > --output=/path/to/counts \ > --flinkMaster=[HOST_IP]:6123 \ > --filesToStage=target/word-count-beam-bundled-0.1.jar" > {code} > I get: > {code} > 2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager > - Submitting job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199). > 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager > - Using restart strategy NoRestartStrategy for > a913f922506053e65e732eeb8336b3bd. > 2017-09-12 06:39:57,227 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers > via failover strategy: full graph restart > 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager > - Running initialization on master for job > wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd). > 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Failed to submit job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199) > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task > 'DataSource (at Read(CompressedSource) > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': > Deserializing the InputFormat > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) > failed: Could not read the user code wrapper: > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) > at >
[jira] [Comment Edited] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException
[ https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166404#comment-16166404 ] Guenther Grill edited comment on BEAM-2943 at 9/14/17 2:57 PM: --- I double checked it, by looking into the {code}target/word-count-beam-bundled-0.1.jar{code} file. The class is there. was (Author: guenhter): I double checked it, by looking into the `target/word-count-beam-bundled-0.1.jar` file. The class is there. > Beam Flink deployment results in ClassNotFoundException > --- > > Key: BEAM-2943 > URL: https://issues.apache.org/jira/browse/BEAM-2943 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0 > Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 > (2017-08-06) x86_64 GNU/Linux >Reporter: Guenther Grill >Assignee: Aljoscha Krettek > Labels: flink > > Hi, > I followed the guide https://beam.apache.org/get-started/quickstart-java/ to > run beam program within a flink cluster. > The output of the dependency-command is: > {code} > mvn dependency:tree -Pflink-runner |grep flink > > [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime > [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime > [INFO]| \- org.apache.flink:force-shading:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-annotations:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime > {code} > Then I started the flink cluster with the correct version with docker-compose > {code} > export JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10 > docker-compose up -d > {code} > The compose file looks like this: > {code} > version: '3.3' > services: > jobmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6123" > ports: > - "6123:6123" > - "8081:8081" > volumes: > - /tmp:/tmp > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > taskmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > {code} > The flink cluster works, but when I execute > {code} > mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > -Pflink-runner \ > -Dexec.args="--runner=FlinkRunner \ > --inputFile=pom.xml \ > --output=/path/to/counts \ > --flinkMaster=[HOST_IP]:6123 \ > --filesToStage=target/word-count-beam-bundled-0.1.jar" > {code} > I get: > {code} > 2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager > - Submitting job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199). > 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager > - Using restart strategy NoRestartStrategy for > a913f922506053e65e732eeb8336b3bd. > 2017-09-12 06:39:57,227 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers > via failover strategy: full graph restart > 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager > - Running initialization on master for job > wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd). > 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Failed to submit job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199) > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task > 'DataSource (at Read(CompressedSource) > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': > Deserializing the InputFormat > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) > failed: Could not read the user code wrapper: > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153) > at >
[jira] [Updated] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException
[ https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guenther Grill updated BEAM-2943: - Description: Hi, I followed the guide https://beam.apache.org/get-started/quickstart-java/ to run beam program within a flink cluster. The output of the dependency-command is: {code} mvn dependency:tree -Pflink-runner |grep flink [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime [INFO]| +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime [INFO]| \- org.apache.flink:force-shading:jar:1.3.0:runtime [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime [INFO]| +- org.apache.flink:flink-annotations:jar:1.3.0:runtime [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime [INFO]| +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime {code} Then I started the flink cluster with the correct version with docker-compose {code} export JOB_MANAGER_RPC_ADDRESS=[HOST_IP] export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10 docker-compose up -d {code} The compose file looks like this: {code} version: '3.3' services: jobmanager: image: ${FLINK_DOCKER_IMAGE_NAME:-flink} expose: - "6123" ports: - "6123:6123" - "8081:8081" volumes: - /tmp:/tmp command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] taskmanager: image: ${FLINK_DOCKER_IMAGE_NAME:-flink} expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager environment: - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] {code} The flink cluster works, but when I execute {code} mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Pflink-runner \ -Dexec.args="--runner=FlinkRunner \ --inputFile=pom.xml \ --output=/path/to/counts \ --flinkMaster=[HOST_IP]:6123 \ --filesToStage=target/word-count-beam-bundled-0.1.jar" {code} I get: {code} 2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager - Submitting job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199). 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager - Using restart strategy NoRestartStrategy for a913f922506053e65e732eeb8336b3bd. 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers via failover strategy: full graph restart 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager - Running initialization on master for job wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd). 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager - Failed to submit job a913f922506053e65e732eeb8336b3bd (wordcount-grg-0912063956-c7ea6199) org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSource (at Read(CompressedSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': Deserializing the InputFormat (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) failed: Could not read the user code wrapper: org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
[jira] [Commented] (BEAM-2943) Beam Flink deployment results in ClassNotFoundException
[ https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166276#comment-16166276 ] Aljoscha Krettek commented on BEAM-2943: Could you verify that {{org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat}} is in the jar file? I just want to rule out the easy cases but it could still be a more involved problem. > Beam Flink deployment results in ClassNotFoundException > --- > > Key: BEAM-2943 > URL: https://issues.apache.org/jira/browse/BEAM-2943 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.1.0 > Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 > (2017-08-06) x86_64 GNU/Linux >Reporter: Guenther Grill >Assignee: Aljoscha Krettek > Labels: flink > > Hi, > I followed the guide https://beam.apache.org/documentation/runners/flink/ to > run beam program within a flink cluster. > The output of the dependency-command is: > {code} > mvn dependency:tree -Pflink-runner |grep flink > > [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime > [INFO]+- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime > [INFO]| \- org.apache.flink:force-shading:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-core:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-annotations:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-java:jar:1.3.0:runtime > [INFO]| +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime > [INFO]+- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime > {code} > Then I started the flink cluster with the correct version with docker-compose > {code} > export JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10 > docker-compose up -d > {code} > The compose file looks like this: > {code} > version: '3.3' > services: > jobmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6123" > ports: > - "6123:6123" > - "8081:8081" > volumes: > - /tmp:/tmp > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > taskmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > {code} > The flink cluster works, but when I execute > {code} > mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > -Pflink-runner \ > -Dexec.args="--runner=FlinkRunner \ > --inputFile=pom.xml \ > --output=/path/to/counts \ > --flinkMaster=[HOST_IP]:6123 \ > --filesToStage=target/word-count-beam-bundled-0.1.jar" > {code} > I get: > {code} > 2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager > - Submitting job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199). > 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager > - Using restart strategy NoRestartStrategy for > a913f922506053e65e732eeb8336b3bd. > 2017-09-12 06:39:57,227 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph- Job recovers > via failover strategy: full graph restart > 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager > - Running initialization on master for job > wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd). > 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Failed to submit job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199) > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task > 'DataSource (at Read(CompressedSource) > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': > Deserializing the InputFormat > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) > failed: Could not read the user code wrapper: > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315) > at >
[jira] [Commented] (BEAM-2948) Unable to find registrar when restoring flink job from savepoint
[ https://issues.apache.org/jira/browse/BEAM-2948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166124#comment-16166124 ] Aljoscha Krettek commented on BEAM-2948: I think the problem is that Beam will only "instantiate" the Hadoop options that are passed via pipeline options at a point when it is too late (This is done in {{FileSystem.setDefaultPipelineOptions()}}). The {{FileBasedSink}} uses a {{GroupByKey}} internally, which requires storing a {{FileResult}} in state. Decoding this (using {{FileResultCoder}}) will try and resolve the Hadoop FileSystem. When using the Heap state backend (or {{FsStateBackend}}) in Flink, state is eagerly deserialised when the pipeline/operator is started. I see some possible solutions: 1. Use the {{RocksDBStateBackend}} which deserialises state lazily so decoding should happen after the Hadoop options have been instantiated 2. We have to change {{FileResultCoder}} to not try and eagerly get a Hadoop Filesystem but instead do this lazily when required. > Unable to find registrar when restoring flink job from savepoint > > > Key: BEAM-2948 > URL: https://issues.apache.org/jira/browse/BEAM-2948 > Project: Beam > Issue Type: Bug > Components: runner-flink > Environment: Flink v1.2.1 job on a EMR cluster using Beam v2.0.0 >Reporter: Luke Cwik >Assignee: Aljoscha Krettek >Priority: Minor > > Reported: > https://lists.apache.org/thread.html/d113707e84d7562e6d0d891830a8d85d76a497435105fe3ed1e06e13@%3Cdev.beam.apache.org%3E > When I try to restore job from savepoint on one task manager I get the > exception *Unable to find registrar for s3n*. > The job can write files to s3 acting as a sink. So S3 access works except > when restoring from savepoint. > I am passing the following configuration as the pipeline > HadoopFileSystemOptions > options: > Configuration configuration = new Configuration(); > configuration.set("fs.defaultFS", String.format("s3n://%s", > jobOptions.getOutputFileSystemRoot())); > configuration.set("fs.default.name", String.format("s3n://%s", > jobOptions.getOutputFileSystemRoot())); > configuration.set("fs.s3.impl", > "org.apache.hadoop.fs.s3native.NativeS3FileSystem"); > configuration.set("fs.s3n.awsAccessKeyId", > jobOptions.getAwsAccessKey()); > configuration.set("fs.s3n.awsSecretAccessKey", > jobOptions.getAwsSecretKey()); > From my investigation it looks like Beam FileSystems. > setDefaultPipelineOptions method is not called before > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder relay on initialised > FileSystems.SCHEME_TO_FILESYSTEM map > java.lang.IllegalStateException: Could not initialize keyed state backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IllegalStateException: Unable to find registrar for s3n > at > org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447) > at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523) > at > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1059) > at > org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1020) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:87) > at > org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:87) > at > org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:27) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284) > ... 6 more -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] beam pull request #3846: JStorm-runner: Remove unnecessary WARN log which mi...
Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/3846 ---
[2/2] beam git commit: This closes #3846
This closes #3846 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd9c548b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd9c548b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd9c548b Branch: refs/heads/jstorm-runner Commit: cd9c548b6e1ee57f3a202da64e5e55bc6704b67f Parents: ef70031 03cc311 Author: Pei HeAuthored: Thu Sep 14 19:29:31 2017 +0800 Committer: Pei He Committed: Thu Sep 14 19:29:31 2017 +0800 -- .../beam/runners/jstorm/translation/TranslatorRegistry.java| 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) --
[1/2] beam git commit: JStorm-runner: Remove unnecessary WARN log, which might case confusion.
Repository: beam Updated Branches: refs/heads/jstorm-runner ef70031b7 -> cd9c548b6 JStorm-runner: Remove unnecessary WARN log, which might case confusion. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03cc311c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03cc311c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03cc311c Branch: refs/heads/jstorm-runner Commit: 03cc311cfbabd92390d7a848a135b59d9d80530c Parents: ef70031 Author: basti.ljAuthored: Wed Sep 13 16:17:14 2017 +0800 Committer: basti.lj Committed: Wed Sep 13 16:17:58 2017 +0800 -- .../beam/runners/jstorm/translation/TranslatorRegistry.java| 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/03cc311c/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java -- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java index c8ea545..f297dc3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TranslatorRegistry.java @@ -48,10 +48,6 @@ class TranslatorRegistry { } public static TransformTranslator getTranslator(PTransform transform) { -TransformTranslator translator = TRANSLATORS.get(transform.getClass()); -if (translator == null) { - LOG.warn("Unsupported operator={}", transform.getClass().getName()); -} -return translator; +return TRANSLATORS.get(transform.getClass()); } }
[jira] [Commented] (BEAM-2467) KinesisIO watermark based on approximateArrivalTimestamp
[ https://issues.apache.org/jira/browse/BEAM-2467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16166076#comment-16166076 ] ASF GitHub Bot commented on BEAM-2467: -- GitHub user pawel-kaczmarczyk opened a pull request: https://github.com/apache/beam/pull/3851 [BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp Follow this checklist to help us incorporate your contribution quickly and easily: - [X] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [X] Each commit in the pull request should have a meaningful subject line and body. - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [X] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [X] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/ocadotechnology/incubator-beam kinesis_watermark Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3851.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3851 commit 7757ca61fb4b392469ebfd0d378cd7a61f46ae48 Author: Pawel KaczmarczykDate: 2017-09-14T10:52:16Z [BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp > KinesisIO watermark based on approximateArrivalTimestamp > > > Key: BEAM-2467 > URL: https://issues.apache.org/jira/browse/BEAM-2467 > Project: Beam > Issue Type: Improvement > Components: sdk-java-extensions >Reporter: Paweł Kaczmarczyk >Assignee: Paweł Kaczmarczyk > > In Kinesis we can start reading the stream at some point in the past during > the retention period (up to 7 days). With current approach for setting > record's timestamp and watermark (both are always set to current time, i.e. > Instant.now()), we can't observe the actual position in the stream. > So the idea is to change this behaviour and set the record timestamp based on > the > [ApproximateArrivalTimestamp|http://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp]. > Watermark will be set accordingly to the last read record's timestamp. > ApproximateArrivalTimestamp is still some approximation and may result in > having records with out-of-order timestamp's which in turn may result in some > events marked as late. This however should not be a frequent issue and even > if it happens it should be a matter of milliseconds or seconds so can be > handled even with a tiny allowedLateness setting -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] beam pull request #3851: [BEAM-2467] Kinesis source watermark based on appro...
GitHub user pawel-kaczmarczyk opened a pull request: https://github.com/apache/beam/pull/3851 [BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp Follow this checklist to help us incorporate your contribution quickly and easily: - [X] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [X] Each commit in the pull request should have a meaningful subject line and body. - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [X] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [X] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/ocadotechnology/incubator-beam kinesis_watermark Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3851.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3851 commit 7757ca61fb4b392469ebfd0d378cd7a61f46ae48 Author: Pawel KaczmarczykDate: 2017-09-14T10:52:16Z [BEAM-2467] Kinesis source watermark based on approximateArrivalTimestamp ---
[jira] [Commented] (BEAM-2790) Error while reading from Amazon S3 via Hadoop File System
[ https://issues.apache.org/jira/browse/BEAM-2790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16165999#comment-16165999 ] ASF GitHub Bot commented on BEAM-2790: -- GitHub user echauchot opened a pull request: https://github.com/apache/beam/pull/3850 [BEAM-2790] Use byte[] instead of ByteBuffer to read from HadoopFilesystem Follow this checklist to help us incorporate your contribution quickly and easily: - [X] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [X] Each commit in the pull request should have a meaningful subject line and body. - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [X] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [X] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [X] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- This is the continuation of PR https://github.com/apache/beam/pull/3744. It fixes the code to deal with the offsets/limits to have the same behavior than read(ByteBuffer) and it adds corner cases tests. R: @lukecwik @steveloughran CC: @iemejia You can merge this pull request into a Git repository by running: $ git pull https://github.com/echauchot/beam byte_array_instead_of_ByBuffer_HadoopFS2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3850.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3850 commit ed5ba0c2ecc6e3a0b305cb9928d610f8ab58acbe Author: Etienne ChauchotDate: 2017-09-14T08:28:12Z [BEAM-2790] Use byte[] instead of ByteBuffer to read from HadoopFilesystem > Error while reading from Amazon S3 via Hadoop File System > - > > Key: BEAM-2790 > URL: https://issues.apache.org/jira/browse/BEAM-2790 > Project: Beam > Issue Type: Bug > Components: sdk-java-extensions >Affects Versions: 2.0.0, 2.1.0 >Reporter: Ismaël Mejía >Assignee: Etienne Chauchot > Fix For: 2.2.0 > > > If you try to use hadoop-aws with Beam to read from AWS S3 it breaks because > S3AInputStream (the implementation of Hadoop's FSDataInputStream) is not > ByteBufferReadable. > {code} > Exception in thread "main" java.lang.UnsupportedOperationException: > Byte-buffer read unsupported by input stream > at > org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:146) > at > org.apache.beam.sdk.io.hdfs.HadoopFileSystem$HadoopSeekableByteChannel.read(HadoopFileSystem.java:192) > at > org.apache.beam.sdk.io.TextSource$TextBasedReader.tryToEnsureNumberOfBytesInBuffer(TextSource.java:232) > at > org.apache.beam.sdk.io.TextSource$TextBasedReader.findSeparatorBounds(TextSource.java:166) > at > org.apache.beam.sdk.io.TextSource$TextBasedReader.readNextRecord(TextSource.java:198) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:481) > at > org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:476) > at > org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:261) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] beam pull request #3850: [BEAM-2790] Use byte[] instead of ByteBuffer to rea...
GitHub user echauchot opened a pull request: https://github.com/apache/beam/pull/3850 [BEAM-2790] Use byte[] instead of ByteBuffer to read from HadoopFilesystem Follow this checklist to help us incorporate your contribution quickly and easily: - [X] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [X] Each commit in the pull request should have a meaningful subject line and body. - [X] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [X] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [X] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [X] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- This is the continuation of PR https://github.com/apache/beam/pull/3744. It fixes the code to deal with the offsets/limits to have the same behavior than read(ByteBuffer) and it adds corner cases tests. R: @lukecwik @steveloughran CC: @iemejia You can merge this pull request into a Git repository by running: $ git pull https://github.com/echauchot/beam byte_array_instead_of_ByBuffer_HadoopFS2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3850.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3850 commit ed5ba0c2ecc6e3a0b305cb9928d610f8ab58acbe Author: Etienne ChauchotDate: 2017-09-14T08:28:12Z [BEAM-2790] Use byte[] instead of ByteBuffer to read from HadoopFilesystem ---
[GitHub] beam pull request #3841: flink-runner: constructs exception string only when...
Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/3841 ---
[2/2] beam git commit: This closes #3841
This closes #3841 Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa4ecea2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa4ecea2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa4ecea2 Branch: refs/heads/master Commit: fa4ecea26713244a83521ce1ca2864f4ad4409c8 Parents: 50532f0 31f51d2 Author: Pei HeAuthored: Thu Sep 14 17:21:09 2017 +0800 Committer: Pei He Committed: Thu Sep 14 17:21:09 2017 +0800 -- .../translation/functions/FlinkAssignContext.java | 17 - 1 file changed, 8 insertions(+), 9 deletions(-) --
[1/2] beam git commit: flink-runner: constructs exception string only when neccessary, it reduces per-element expensive calls(String.format and getSimpleName) in FlinkAssignContext.
Repository: beam Updated Branches: refs/heads/master 50532f0a9 -> fa4ecea26 flink-runner: constructs exception string only when neccessary, it reduces per-element expensive calls(String.format and getSimpleName) in FlinkAssignContext. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31f51d28 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31f51d28 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31f51d28 Branch: refs/heads/master Commit: 31f51d28c574ea1792312a528b25793230787486 Parents: 50532f0 Author: Pei HeAuthored: Tue Sep 12 17:26:28 2017 +0800 Committer: Pei He Committed: Thu Sep 14 17:20:54 2017 +0800 -- .../translation/functions/FlinkAssignContext.java | 17 - 1 file changed, 8 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/beam/blob/31f51d28/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java -- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java index 447b1e5..26d6721 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.flink.translation.functions; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -35,13 +33,14 @@ class FlinkAssignContext FlinkAssignContext(WindowFn fn, WindowedValue value) { fn.super(); -checkArgument( -Iterables.size(value.getWindows()) == 1, -String.format( -"%s passed to window assignment must be in a single window, but it was in %s: %s", -WindowedValue.class.getSimpleName(), -Iterables.size(value.getWindows()), -value.getWindows())); +if (Iterables.size(value.getWindows()) != 1) { + throw new IllegalArgumentException( + String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); +} this.value = value; }