[GitHub] incubator-beam pull request #514: Add Travis config for Python SDK tests
Github user aaltay closed the pull request at: https://github.com/apache/incubator-beam/pull/514 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #514
This closes #514 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d5719a5a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d5719a5a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d5719a5a Branch: refs/heads/python-sdk Commit: d5719a5aaf2c980d19daa99b99cb6e031c02a3bc Parents: e3a43fb bd8b5ad Author: Davor BonaciAuthored: Wed Jun 22 18:56:36 2016 -0700 Committer: Davor Bonaci Committed: Wed Jun 22 18:56:36 2016 -0700 -- .travis.yml | 22 +++--- sdks/python/tox.ini | 24 2 files changed, 43 insertions(+), 3 deletions(-) --
[1/2] incubator-beam git commit: Travis config for python tests
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e3a43fb5c -> d5719a5aa Travis config for python tests Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bd8b5ad2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bd8b5ad2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bd8b5ad2 Branch: refs/heads/python-sdk Commit: bd8b5ad2a4fb966d732ecf17f7ed846b6ef0e27f Parents: e3a43fb Author: Ahmet AltayAuthored: Tue Jun 21 14:18:24 2016 -0700 Committer: Davor Bonaci Committed: Wed Jun 22 18:56:02 2016 -0700 -- .travis.yml | 22 +++--- sdks/python/tox.ini | 24 2 files changed, 43 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd8b5ad2/.travis.yml -- diff --git a/.travis.yml b/.travis.yml index 6d81689..ec74fa4 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,6 +28,11 @@ notifications: on_success: change on_failure: always +addons: + apt: +packages: +- python2.7 + matrix: include: # On OSX, run with default JDK only. @@ -40,15 +45,26 @@ matrix: env: CUSTOM_JDK="oraclejdk7" MAVEN_OVERRIDE="-DforkCount=0" - os: linux env: CUSTOM_JDK="openjdk7" MAVEN_OVERRIDE="-DforkCount=0" +# Python SDK environments. +- os: osx + env: TEST_PYTHON="1" +- os: linux + env: TEST_PYTHON="1" before_install: - echo "MAVEN_OPTS='-Xmx2048m -XX:MaxPermSize=512m'" > ~/.mavenrc - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi + # Python SDK environment settings. + - export TOX_ENV=py27 + - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export TOX_HOME=$HOME/Library/Python/2.7/bin; fi + - if [ "$TRAVIS_OS_NAME" == "linux" ]; then export TOX_HOME=$HOME/.local/bin; fi install: - - travis_retry mvn -B install clean -U -DskipTests=true + - if [ ! "$TEST_PYTHON" ]; then travis_retry mvn -B install clean -U -DskipTests=true; fi + - if [ "$TEST_PYTHON" ]; then travis_retry pip install tox --user `whoami`; fi script: - - travis_retry mvn -B $MAVEN_OVERRIDE install -U - - travis_retry testing/travis/test_wordcount.sh + - if [ "$TEST_PYTHON" ]; then travis_retry $TOX_HOME/tox -e $TOX_ENV -c sdks/python/tox.ini; fi + - if [ ! "$TEST_PYTHON" ]; then travis_retry mvn -B $MAVEN_OVERRIDE install -U; fi + - if [ ! "$TEST_PYTHON" ]; then travis_retry testing/travis/test_wordcount.sh; fi http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bd8b5ad2/sdks/python/tox.ini -- diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini new file mode 100644 index 000..c0208b5 --- /dev/null +++ b/sdks/python/tox.ini @@ -0,0 +1,24 @@ +; +;Licensed to the Apache Software Foundation (ASF) under one or more +;contributor license agreements. See the NOTICE file distributed with +;this work for additional information regarding copyright ownership. +;The ASF licenses this file to You under the Apache License, Version 2.0 +;(the "License"); you may not use this file except in compliance with +;the License. You may obtain a copy of the License at +; +; http://www.apache.org/licenses/LICENSE-2.0 +; +;Unless required by applicable law or agreed to in writing, software +;distributed under the License is distributed on an "AS IS" BASIS, +;WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;See the License for the specific language governing permissions and +;limitations under the License. +; + +[tox] +envlist = py27 + +[testenv:py27] +commands = + python setup.py test +passenv = TRAVIS*
[GitHub] incubator-beam pull request #293: Remove Dataflow runner references in WordC...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/293 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Remove Dataflow runner references in WordCount examples.
Repository: incubator-beam Updated Branches: refs/heads/master e0cae9fb6 -> 748b0c8da Remove Dataflow runner references in WordCount examples. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a554f062 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a554f062 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a554f062 Branch: refs/heads/master Commit: a554f062f32d0807a6840928eb0685943a808d42 Parents: e0cae9f Author: Pei HeAuthored: Fri Jun 17 15:39:59 2016 -0700 Committer: Davor Bonaci Committed: Wed Jun 22 18:49:54 2016 -0700 -- examples/java/pom.xml | 26 - .../apache/beam/examples/MinimalWordCount.java | 26 +++-- examples/java8/pom.xml | 20 - .../beam/examples/MinimalWordCountJava8.java| 30 +++- 4 files changed, 60 insertions(+), 42 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a554f062/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index cac9857..223334f 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -221,19 +221,6 @@ - org.apache.beam - beam-runners-direct-java - ${project.version} - runtime - - - - org.apache.beam - beam-runners-google-cloud-dataflow-java - ${project.version} - - - com.google.api-client google-api-client @@ -294,6 +281,19 @@ + org.apache.beam + beam-runners-direct-java + ${project.version} + runtime + + + + org.apache.beam + beam-runners-google-cloud-dataflow-java + ${project.version} + + + org.slf4j slf4j-jdk14 runtime http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a554f062/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java -- diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java index 6d4bfd4..355a1ff 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java @@ -17,10 +17,9 @@ */ package org.apache.beam.examples; -import org.apache.beam.runners.dataflow.BlockingDataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; @@ -63,13 +62,22 @@ public class MinimalWordCount { // Create a PipelineOptions object. This object lets us set various execution // options for our pipeline, such as the associated Cloud Platform project and the location // in Google Cloud Storage to stage files. -DataflowPipelineOptions options = PipelineOptionsFactory.create() -.as(DataflowPipelineOptions.class); -options.setRunner(BlockingDataflowRunner.class); -// CHANGE 1/3: Your project ID is required in order to run your pipeline on the Google Cloud. -options.setProject("SET_YOUR_PROJECT_ID_HERE"); -// CHANGE 2/3: Your Google Cloud Storage path is required for staging local files. - options.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); +PipelineOptions options = PipelineOptionsFactory.create(); + +// In order to run your pipeline, you need to make following runner specific changes: +// +// CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner +// or FlinkPipelineRunner. +// CHANGE 2/3: Specify runner-required options. +// For BlockingDataflowRunner, set project and temp location as follows: +// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class); +// dataflowOptions.setRunner(BlockingDataflowRunner.class); +// dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE"); +// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY"); +// For FlinkPipelineRunner, set the runner as follows. See {@code FlinkPipelineOptions} +// for more details. +// options.as(FlinkPipelineOptions.class) +// .setRunner(FlinkPipelineRunner.class); // Create the Pipeline object with the options we defined above. Pipeline p =
[2/2] incubator-beam git commit: This closes #293
This closes #293 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/748b0c8d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/748b0c8d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/748b0c8d Branch: refs/heads/master Commit: 748b0c8da99f1942f230302f707266f46ca3520f Parents: e0cae9f a554f06 Author: Davor BonaciAuthored: Wed Jun 22 18:50:30 2016 -0700 Committer: Davor Bonaci Committed: Wed Jun 22 18:50:30 2016 -0700 -- examples/java/pom.xml | 26 - .../apache/beam/examples/MinimalWordCount.java | 26 +++-- examples/java8/pom.xml | 20 - .../beam/examples/MinimalWordCountJava8.java| 30 +++- 4 files changed, 60 insertions(+), 42 deletions(-) --
[jira] [Commented] (BEAM-37) Run DoFnWithContext without conversion to vanilla DoFn
[ https://issues.apache.org/jira/browse/BEAM-37?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345529#comment-15345529 ] ASF GitHub Bot commented on BEAM-37: GitHub user bjchambers opened a pull request: https://github.com/apache/incubator-beam/pull/521 [BEAM-37] DoFnReflector: Add inoker interface and generate code Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [*] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [*] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [*] Replace `` in the title with the actual Jira issue number, if there is one. - [*] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- The method to call for a DoFnWithContext requires reflection since the shape of the parameters may change. Doing so in each processElement call puts this refelection in the hot path. This PR introduces a DoFnInvoker interface which is bound to a specific DoFnWithContext and delegates the three important methods (startBundle, processElement, finishBundle). It uses byte-buddy to generate a simple trampoline implementation of the DoFnInvoker class for each type of DoFnWithContext. This leads to 2-3x better performance in micro-benchmarks of method dispatching. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bjchambers/incubator-beam byte-buddy-dofn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/521.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #521 commit 009be346ecbf4e2db899982f6070e2208e47be10 Author: Ben ChambersDate: 2016-06-22T13:47:23Z [BEAM-37] DoFnReflector: Add inoker interface and generate code The method to call for a DoFnWithContext requires reflection since the shape of the parameters may change. Doing so in each processElement call puts this refelection in the hot path. This PR introduces a DoFnInvoker interface which is bound to a specific DoFnWithContext and delegates the three important methods (startBundle, processElement, finishBundle). It uses byte-buddy to generate a simple trampoline implementation of the DoFnInvoker class for each type of DoFnWithContext. This leads to 2-3x better performance in micro-benchmarks of method dispatching. > Run DoFnWithContext without conversion to vanilla DoFn > -- > > Key: BEAM-37 > URL: https://issues.apache.org/jira/browse/BEAM-37 > Project: Beam > Issue Type: Sub-task > Components: runner-core >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles > > DoFnWithContext is an enhanced DoFn where annotations and parameter lists are > inspected to determine whether it accesses windowing information, etc. > Today, each feature of DoFnWithContext requires implementation on DoFn, which > precludes the easy addition of features that we don't have designs for in > DoFn. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #521: [BEAM-37] DoFnReflector: Add inoker interf...
GitHub user bjchambers opened a pull request: https://github.com/apache/incubator-beam/pull/521 [BEAM-37] DoFnReflector: Add inoker interface and generate code Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [*] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [*] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [*] Replace `` in the title with the actual Jira issue number, if there is one. - [*] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- The method to call for a DoFnWithContext requires reflection since the shape of the parameters may change. Doing so in each processElement call puts this refelection in the hot path. This PR introduces a DoFnInvoker interface which is bound to a specific DoFnWithContext and delegates the three important methods (startBundle, processElement, finishBundle). It uses byte-buddy to generate a simple trampoline implementation of the DoFnInvoker class for each type of DoFnWithContext. This leads to 2-3x better performance in micro-benchmarks of method dispatching. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bjchambers/incubator-beam byte-buddy-dofn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/521.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #521 commit 009be346ecbf4e2db899982f6070e2208e47be10 Author: Ben ChambersDate: 2016-06-22T13:47:23Z [BEAM-37] DoFnReflector: Add inoker interface and generate code The method to call for a DoFnWithContext requires reflection since the shape of the parameters may change. Doing so in each processElement call puts this refelection in the hot path. This PR introduces a DoFnInvoker interface which is bound to a specific DoFnWithContext and delegates the three important methods (startBundle, processElement, finishBundle). It uses byte-buddy to generate a simple trampoline implementation of the DoFnInvoker class for each type of DoFnWithContext. This leads to 2-3x better performance in micro-benchmarks of method dispatching. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #502: Move allowsDynamicSplitting to Reader, and...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/502 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Move allowsDynamicSplitting to Reader, and set it in CompressedSource.
Repository: incubator-beam Updated Branches: refs/heads/master 8949ec315 -> e0cae9fb6 Move allowsDynamicSplitting to Reader, and set it in CompressedSource. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b7e9a7e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b7e9a7e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b7e9a7e8 Branch: refs/heads/master Commit: b7e9a7e8fa467641dfe1c19fa5d5f63f4b74a6d0 Parents: 8949ec3 Author: Pei HeAuthored: Fri Jun 17 18:24:06 2016 -0700 Committer: Dan Halperin Committed: Wed Jun 22 18:07:16 2016 -0700 -- .../apache/beam/sdk/io/CompressedSource.java| 5 +++ .../apache/beam/sdk/io/OffsetBasedSource.java | 26 +++ .../beam/sdk/io/CompressedSourceTest.java | 35 3 files changed, 53 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index a5c54b3..75bfc8f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -396,6 +396,11 @@ public class CompressedSource extends FileBasedSource { } @Override +public boolean allowsDynamicSplitting() { + return splittable; +} + +@Override public final long getSplitPointsConsumed() { if (splittable) { return readerDelegate.getSplitPointsConsumed(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index 2f62acd..295eab9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -192,17 +192,6 @@ public abstract class OffsetBasedSource extends BoundedSource { */ public abstract OffsetBasedSource createSourceForSubrange(long start, long end); - /** - * Whether this source should allow dynamic splitting of the offset ranges. - * - * True by default. Override this to return false if the source cannot - * support dynamic splitting correctly. If this returns false, - * {@link OffsetBasedSource.OffsetBasedReader#splitAtFraction} will refuse all split requests. - */ - public boolean allowsDynamicSplitting() { -return true; - } - @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); @@ -342,7 +331,7 @@ public abstract class OffsetBasedSource extends BoundedSource { // Note that even if the current source does not allow splitting, we don't know that // it's non-empty so we return UNKNOWN instead of 1. return BoundedReader.SPLIT_POINTS_UNKNOWN; - } else if (!getCurrentSource().allowsDynamicSplitting()) { + } else if (!allowsDynamicSplitting()) { // Started (so non-empty) and unsplittable, so only the current task. return 1; } else if (getCurrentOffset() >= rangeTracker.getStopPosition() - 1) { @@ -355,9 +344,20 @@ public abstract class OffsetBasedSource extends BoundedSource { } } +/** + * Whether this reader should allow dynamic splitting of the offset ranges. + * + * True by default. Override this to return false if the reader cannot + * support dynamic splitting correctly. If this returns false, + * {@link OffsetBasedReader#splitAtFraction} will refuse all split requests. + */ +public boolean allowsDynamicSplitting() { + return true; +} + @Override public final synchronized OffsetBasedSource splitAtFraction(double fraction) { - if (!getCurrentSource().allowsDynamicSplitting()) { + if (!allowsDynamicSplitting()) { return null; } if (rangeTracker.getStopPosition() == Long.MAX_VALUE) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b7e9a7e8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
[2/2] incubator-beam git commit: Closes #502
Closes #502 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e0cae9fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e0cae9fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e0cae9fb Branch: refs/heads/master Commit: e0cae9fb691c53f5aeb1c43c52ff4354d4680f2e Parents: 8949ec3 b7e9a7e Author: Dan HalperinAuthored: Wed Jun 22 18:07:17 2016 -0700 Committer: Dan Halperin Committed: Wed Jun 22 18:07:17 2016 -0700 -- .../apache/beam/sdk/io/CompressedSource.java| 5 +++ .../apache/beam/sdk/io/OffsetBasedSource.java | 26 +++ .../beam/sdk/io/CompressedSourceTest.java | 35 3 files changed, 53 insertions(+), 13 deletions(-) --
[jira] [Commented] (BEAM-91) Retractions
[ https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345507#comment-15345507 ] Matt Pouttu-Clarke commented on BEAM-91: With regards versioning structural changes, removing a field is one example as is changing the type of a field. In this case one must replay all relevant history with the change applied AND more importantly quickly identify the root cause of failures related to the structural change. With regard to retaining "deleted" data and relationships, the best real example I have are versioned hierarchical structures like zip codes and sales territories. You cannot reject mail because the zip code has changed or moved, and sales people will have a conniption if their numbers change and effect their commissions. Thus in the real world these historical structures remain frozen in time potentially forever even when they are "deleted". > Retractions > --- > > Key: BEAM-91 > URL: https://issues.apache.org/jira/browse/BEAM-91 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Tyler Akidau >Assignee: Frances Perry > Original Estimate: 672h > Remaining Estimate: 672h > > We still haven't added retractions to Beam, even though they're a core part > of the model. We should document all the necessary aspects (uncombine, > reverting DoFn output with DoOvers, sink integration, source-level > retractions, etc), and then implement them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-91) Retractions
[ https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345476#comment-15345476 ] Matt Pouttu-Clarke commented on BEAM-91: Yes agreed it is not clear yet from the docs how this relates directly to Beam. However this is mainly a terminology issue in my perspective. The bespoke systems I have built over the last few years to unify batch and stream processing all rely on data versioning to ensure point-in-session consistency (watermarks) across streams and all data derived from streams such as aggregates, transforms, splits, and replicas. There is no hard dependency on a configuration service but it is critical to keep a current water mark and all historical watermarks in a system of record. This could be as simple as a shared file system or as complex as etcd. That aside the versioning model I set forward for flatbuffers is an example using more recent technologies. I have done the same with relational tables and Avro in the past. I'll work on the examples of how the versioning model feeds aggregate refresh and hopefully it will become more clear. > Retractions > --- > > Key: BEAM-91 > URL: https://issues.apache.org/jira/browse/BEAM-91 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Tyler Akidau >Assignee: Frances Perry > Original Estimate: 672h > Remaining Estimate: 672h > > We still haven't added retractions to Beam, even though they're a core part > of the model. We should document all the necessary aspects (uncombine, > reverting DoFn output with DoOvers, sink integration, source-level > retractions, etc), and then implement them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (BEAM-359) AvroCoder should be able to handle anonymous classes as schemas
[ https://issues.apache.org/jira/browse/BEAM-359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Chambers closed BEAM-359. - Resolution: Fixed Fix Version/s: 0.2.0-incubating > AvroCoder should be able to handle anonymous classes as schemas > --- > > Key: BEAM-359 > URL: https://issues.apache.org/jira/browse/BEAM-359 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 0.1.0-incubating, 0.2.0-incubating >Reporter: Daniel Mills >Assignee: Ben Chambers > Fix For: 0.2.0-incubating > > > Currently, the determinism checker NPEs with: > java.lang.IllegalArgumentException: Unable to get field id from class null > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.getField(AvroCoder.java:710) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:548) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:567) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:430) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder.(AvroCoder.java:189) > at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:144) > at mypackage.GenericsTest$1.create(GenericsTest.java:102) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoderFromFactory(CoderRegistry.java:797) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:748) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:719) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:696) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:178) > at > com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:147) > at > com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-91) Retractions
[ https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345427#comment-15345427 ] Tyler Akidau commented on BEAM-91: -- Hi Matt, thanks a lot for writing all that up. My initial comment would be that this sounds like a systems-level approach towards providing general versioning of data that may change over time. That's really cool. I think the problem of retractions within Beam is actually much more constrained, though, so I'm not sure we'd need that full level of generality. And as a portable layer, we also generally will need to avoid taking system-level dependencies like zookeeper, etc. Runner can choose to take such dependencies, but the framework itself needs to be very careful about such things, typically formulating them via contracts in the model rather than direct dependencies to actual systems. The set of problems we need to tackle with retractions in Beam boils down, I think, to essentially these: 1. Support remembering the last published panes for a window, so we can retract them in the future. Can be done with Beam's persistent state layer. Will need to support storing multiple previous panes in the case of merging windows like sessions. This is probably pretty straightforward. 2. Support annotating retraction elements as retractions. This will be some form of metadata on the record. Also relatively straightforward. 3. Support retraction propagation. This are is progressively more interesting as you go down the list of sub tasks, I think: a. Inputs which are retractions always generate outputs which are retractions. So records which are retractions should always be processed independently from normal data, so the DoFn itself need not be aware of whether the data are retractions or not. b. For specific use cases, we may want to provide a way for a DoFn to find out if it is processing retractions. But having a concrete use case for this would be nice before committing to do so. c. CombineFn will need a retractInput method to complement addInput, which will essentially uncombine the given values from the accumulator. d. Sinks will need to be made retraction aware. Dan Halperin may have thoughts here. e. Ideally, we would come up with a scheme to make retractions compatible with non-deterministic DoFns. This is expensive in the general case (remember all inputs and their corresponding outputs, so that you can re-produce that output as a retraction when you see a corresponding input). Would be cool if we can come up with something smarter, but I'm not sure what that would be. It may be that we simply need to provide a way to annotate a DoFn as non-deterministic to ensure that the expensive-but-correct mode supporting non-determinism is used. Additional things we could consider adding: 4. Support for publishing retractions directly from sources. This would allow for the input data themselves to be annotated as retractions for use cases where it is known ahead of time that you're retracting a previously provided value. Given that, I'd be curious to hear your thoughts on how Bloklinx relates to this. There doesn't seem to be sufficient information in the existing docs for me to do that well, beyond seeing that it appears to solve a similar, but more general problem in a self-contained system. One thing you mention above that isn't covered here is retractions in light of structural changes. From the perspective of providing a highly general solution, I see why that makes sense. But I'd be curious to hear your thoughts on real world use cases where that's applicable. That ties into the much larger question of supporting pipeline updates more cleanly within the Beam model itself, which itself is an interesting area to explore in the future. But I've never considered the idea of actually retracting data from portions of the pipeline that have been removed, and I can't immediately come up with use cases where that would be desirable. Any light you could shed here would be appreciated. > Retractions > --- > > Key: BEAM-91 > URL: https://issues.apache.org/jira/browse/BEAM-91 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Tyler Akidau >Assignee: Frances Perry > Original Estimate: 672h > Remaining Estimate: 672h > > We still haven't added retractions to Beam, even though they're a core part > of the model. We should document all the necessary aspects (uncombine, > reverting DoFn output with DoOvers, sink integration, source-level > retractions, etc), and then implement them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (BEAM-369) Add travis config to website
James Malone created BEAM-369: - Summary: Add travis config to website Key: BEAM-369 URL: https://issues.apache.org/jira/browse/BEAM-369 Project: Beam Issue Type: Bug Components: website Reporter: James Malone Assignee: James Malone Add a Travis CI config to the Beam website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam-site pull request #24: Travis CI YAML config
GitHub user evilsoapbox opened a pull request: https://github.com/apache/incubator-beam-site/pull/24 Travis CI YAML config Adding a Travis CI configuration file so we can use Travis on the Beam website. You can merge this pull request into a Git repository by running: $ git pull https://github.com/evilsoapbox/incubator-beam-site asf-site Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam-site/pull/24.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #24 commit e981291ca5c96e4f9097fd10d2362fc9a3dc94dd Author: James MaloneDate: 2016-06-22T22:07:45Z Travis CI YAML config --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-359) AvroCoder should be able to handle anonymous classes as schemas
[ https://issues.apache.org/jira/browse/BEAM-359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345247#comment-15345247 ] ASF GitHub Bot commented on BEAM-359: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/513 > AvroCoder should be able to handle anonymous classes as schemas > --- > > Key: BEAM-359 > URL: https://issues.apache.org/jira/browse/BEAM-359 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 0.1.0-incubating, 0.2.0-incubating >Reporter: Daniel Mills >Assignee: Ben Chambers > > Currently, the determinism checker NPEs with: > java.lang.IllegalArgumentException: Unable to get field id from class null > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.getField(AvroCoder.java:710) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:548) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.checkRecord(AvroCoder.java:567) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.doCheck(AvroCoder.java:477) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.recurse(AvroCoder.java:453) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder$AvroDeterminismChecker.check(AvroCoder.java:430) > at > com.google.cloud.dataflow.sdk.coders.AvroCoder.(AvroCoder.java:189) > at com.google.cloud.dataflow.sdk.coders.AvroCoder.of(AvroCoder.java:144) > at mypackage.GenericsTest$1.create(GenericsTest.java:102) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoderFromFactory(CoderRegistry.java:797) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:748) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:719) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:696) > at > com.google.cloud.dataflow.sdk.coders.CoderRegistry.getDefaultCoder(CoderRegistry.java:178) > at > com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:147) > at > com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #513: Fix BEAM-359
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/513 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: [BEAM-359] Treat erased type variables as non-deterministic in AvroCoder
Repository: incubator-beam Updated Branches: refs/heads/master 9bcf9b0a7 -> 8949ec315 [BEAM-359] Treat erased type variables as non-deterministic in AvroCoder Previously, loss of type information due to erasure would lead to an IllegalArgumentException in the constructor. Now, the coder is created and usable but treated as non-deterministic. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9c8dc4fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9c8dc4fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9c8dc4fe Branch: refs/heads/master Commit: 9c8dc4fe920618253b425f0d998f8d63552ec358 Parents: 9bcf9b0 Author: bchambersAuthored: Tue Jun 21 13:42:33 2016 -0700 Committer: bchambers Committed: Wed Jun 22 14:33:31 2016 -0700 -- .../org/apache/beam/sdk/coders/AvroCoder.java | 10 +--- .../apache/beam/sdk/coders/AvroCoderTest.java | 26 2 files changed, 33 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c8dc4fe/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java -- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 3b93ec3..00c1cbc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -475,6 +475,10 @@ public class AvroCoder extends StandardCoder { checkMap(context, type, schema); break; case RECORD: + if (!(type.getType() instanceof Class)) { +reportError(context, "Cannot determine type from generic %s due to erasure", type); +return; + } checkRecord(type, schema); break; case UNION: @@ -695,7 +699,8 @@ public class AvroCoder extends StandardCoder { * Extract a field from a class. We need to look at the declared fields so that we can * see private fields. We may need to walk up to the parent to get classes from the parent. */ -private static Field getField(Class clazz, String name) { +private static Field getField(Class originalClazz, String name) { + Class clazz = originalClazz; while (clazz != null) { for (Field field : clazz.getDeclaredFields()) { AvroName avroName = field.getAnnotation(AvroName.class); @@ -708,8 +713,7 @@ public class AvroCoder extends StandardCoder { clazz = clazz.getSuperclass(); } - throw new IllegalArgumentException( - "Unable to get field " + name + " from class " + clazz); + throw new IllegalArgumentException("Unable to get field " + name + " from " + originalClazz); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9c8dc4fe/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java -- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 8f28cc4..207bfdd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -755,4 +755,30 @@ public class AvroCoderTest { return Objects.hash(getClass(), onlySomeTypesAllowed); } } + + @Test + public void testAvroCoderForGenerics() throws Exception { +Schema fooSchema = AvroCoder.of(Foo.class).getSchema(); +Schema schema = new Schema.Parser().parse("{" ++ "\"type\":\"record\"," ++ "\"name\":\"SomeGeneric\"," ++ "\"namespace\":\"ns\"," ++ "\"fields\":[" ++ " {\"name\":\"foo\", \"type\":" + fooSchema.toString() + "}" ++ "]}"); +@SuppressWarnings("rawtypes") +AvroCoder coder = AvroCoder.of(SomeGeneric.class, schema); + +assertNonDeterministic(coder, +reasonField(SomeGeneric.class, "foo", "erasure")); + } + + private static class SomeGeneric { +@SuppressWarnings("unused") +private T foo; + } + private static class Foo { +@SuppressWarnings("unused") +String id; + } }
[2/2] incubator-beam git commit: This closes #513
This closes #513 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8949ec31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8949ec31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8949ec31 Branch: refs/heads/master Commit: 8949ec3159d552be108819d7951b649c00271dd7 Parents: 9bcf9b0 9c8dc4f Author: bchambersAuthored: Wed Jun 22 14:35:28 2016 -0700 Committer: bchambers Committed: Wed Jun 22 14:35:28 2016 -0700 -- .../org/apache/beam/sdk/coders/AvroCoder.java | 10 +--- .../apache/beam/sdk/coders/AvroCoderTest.java | 26 2 files changed, 33 insertions(+), 3 deletions(-) --
[1/2] incubator-beam git commit: Fix a typo in FlinkRunner.toString
Repository: incubator-beam Updated Branches: refs/heads/master c83e5c48f -> 9bcf9b0a7 Fix a typo in FlinkRunner.toString Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b3aa73df Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b3aa73df Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b3aa73df Branch: refs/heads/master Commit: b3aa73df7086e7a5241ca7dec39085a2b679aacf Parents: c83e5c4 Author: peiheAuthored: Tue Jun 21 11:05:46 2016 -0700 Committer: Davor Bonaci Committed: Wed Jun 22 13:48:31 2016 -0700 -- .../src/main/java/org/apache/beam/runners/flink/FlinkRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b3aa73df/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java -- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index d8c5c12..47c4877 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -144,7 +144,7 @@ public class FlinkRunner extends PipelineRunner { @Override public String toString() { -return "DataflowRunner#" + hashCode(); +return "FlinkRunner#" + hashCode(); } /**
[GitHub] incubator-beam pull request #511: Fix a typo in FlinkRunner.toString
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/511 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: This closes #511
This closes #511 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9bcf9b0a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9bcf9b0a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9bcf9b0a Branch: refs/heads/master Commit: 9bcf9b0a77b6247c69622771a3db5ef3be49dfd1 Parents: c83e5c4 b3aa73d Author: Davor BonaciAuthored: Wed Jun 22 13:48:40 2016 -0700 Committer: Davor Bonaci Committed: Wed Jun 22 13:48:40 2016 -0700 -- .../src/main/java/org/apache/beam/runners/flink/FlinkRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[jira] [Commented] (BEAM-91) Retractions
[ https://issues.apache.org/jira/browse/BEAM-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15345012#comment-15345012 ] Matt Pouttu-Clarke commented on BEAM-91: Added information the the local repo (similar to git local repo): https://github.com/LamdaFu/bloklinx/wiki/Local-Repo Working on Bloklinx swarm design next (i.e. what happens during and after a push) > Retractions > --- > > Key: BEAM-91 > URL: https://issues.apache.org/jira/browse/BEAM-91 > Project: Beam > Issue Type: New Feature > Components: beam-model >Reporter: Tyler Akidau >Assignee: Frances Perry > Original Estimate: 672h > Remaining Estimate: 672h > > We still haven't added retractions to Beam, even though they're a core part > of the model. We should document all the necessary aspects (uncombine, > reverting DoFn output with DoOvers, sink integration, source-level > retractions, etc), and then implement them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-115) Beam Runner API
[ https://issues.apache.org/jira/browse/BEAM-115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344778#comment-15344778 ] ASF GitHub Bot commented on BEAM-115: - GitHub user kennknowles opened a pull request: https://github.com/apache/incubator-beam/pull/520 [BEAM-115] Add ViewFn to SDK and adjust PCollectionView impls Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This is a small adjustment that reflects the language-independent architecture in the Java code, leaving the prior structure in place temporarily for ease of migration. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kennknowles/incubator-beam ViewFn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/520.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #520 commit 56204137e4dbaaf1ea30770fb7927f27fa777805 Author: Kenneth KnowlesDate: 2016-06-22T15:39:33Z Add ViewFn to SDK and adjust PCollectionView impls > Beam Runner API > --- > > Key: BEAM-115 > URL: https://issues.apache.org/jira/browse/BEAM-115 > Project: Beam > Issue Type: Improvement > Components: runner-core >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles > > The PipelineRunner API from the SDK is not ideal for the Beam technical > vision. > It has technical limitations: > - The user's DAG (even including library expansions) is never explicitly > represented, so it cannot be analyzed except incrementally, and cannot > necessarily be reconstructed (for example, to display it!). > - The flattened DAG of just primitive transforms isn't well-suited for > display or transform override. > - The TransformHierarchy isn't well-suited for optimizations. > - The user must realistically pre-commit to a runner, and its configuration > (batch vs streaming) prior to graph construction, since the runner will be > modifying the graph as it is built. > - It is fairly language- and SDK-specific. > It has usability issues (these are not from intuition, but derived from > actual cases of failure to use according to the design) > - The interleaving of apply() methods in PTransform/Pipeline/PipelineRunner > is confusing. > - The TransformHierarchy, accessible only via visitor traversals, is > cumbersome. > - The staging of construction-time vs run-time is not always obvious. > These are just examples. This ticket tracks designing, coming to consensus, > and building an API that more simply and directly supports the technical > vision. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #520: [BEAM-115] Add ViewFn to SDK and adjust PC...
GitHub user kennknowles opened a pull request: https://github.com/apache/incubator-beam/pull/520 [BEAM-115] Add ViewFn to SDK and adjust PCollectionView impls Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [x] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [x] Replace `` in the title with the actual Jira issue number, if there is one. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This is a small adjustment that reflects the language-independent architecture in the Java code, leaving the prior structure in place temporarily for ease of migration. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kennknowles/incubator-beam ViewFn Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/520.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #520 commit 56204137e4dbaaf1ea30770fb7927f27fa777805 Author: Kenneth KnowlesDate: 2016-06-22T15:39:33Z Add ViewFn to SDK and adjust PCollectionView impls --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (BEAM-33) Make DataflowAssert more window-aware
[ https://issues.apache.org/jira/browse/BEAM-33?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Halperin closed BEAM-33. --- > Make DataflowAssert more window-aware > - > > Key: BEAM-33 > URL: https://issues.apache.org/jira/browse/BEAM-33 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Thomas Groh > Labels: Triggers, Windowing > > Today DataflowAssert rewindows into the global window before performing a > side input (as an implementation detail). This precludes support for other > windowing schemes and triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (BEAM-33) Make DataflowAssert more window-aware
[ https://issues.apache.org/jira/browse/BEAM-33?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Halperin resolved BEAM-33. - Resolution: Duplicate > Make DataflowAssert more window-aware > - > > Key: BEAM-33 > URL: https://issues.apache.org/jira/browse/BEAM-33 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Thomas Groh > Labels: Triggers, Windowing > > Today DataflowAssert rewindows into the global window before performing a > side input (as an implementation detail). This precludes support for other > windowing schemes and triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[1/2] incubator-beam git commit: Deduplicate Unbounded Reads
Repository: incubator-beam Updated Branches: refs/heads/master 3bc0fe669 -> c83e5c48f Deduplicate Unbounded Reads This ensures that sources that produce duplicate elements that are marked as requiresDeduplication are handled by the DirectRunner. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/569228e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/569228e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/569228e4 Branch: refs/heads/master Commit: 569228e4c5fc2e3a722d8f3089d75cc6fc197d93 Parents: 3bc0fe6 Author: Thomas GrohAuthored: Wed Jun 15 14:41:09 2016 -0700 Committer: Dan Halperin Committed: Wed Jun 22 09:57:23 2016 -0700 -- .../direct/UnboundedReadDeduplicator.java | 102 ++ .../direct/UnboundedReadEvaluatorFactory.java | 17 ++- .../direct/UnboundedReadDeduplicatorTest.java | 134 +++ .../UnboundedReadEvaluatorFactoryTest.java | 50 ++- 4 files changed, 299 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569228e4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java -- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java new file mode 100644 index 000..0246236 --- /dev/null +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadDeduplicator.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.io.Read.Unbounded; +import org.apache.beam.sdk.transforms.PTransform; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +import org.joda.time.Duration; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Provides methods to determine if a record is a duplicate within the evaluation of a + * {@link Unbounded} {@link PTransform}. + */ +interface UnboundedReadDeduplicator { + /** + * Returns true if the record with the provided ID should be output, and false if it should not + * be because it is a duplicate. + */ + boolean shouldOutput(byte[] recordId); + + /** + * An {@link UnboundedReadDeduplicator} that always returns true. For use with sources do not + * require deduplication. + */ + class NeverDeduplicator implements UnboundedReadDeduplicator { +/** + * Create a new {@link NeverDeduplicator}. + */ +public static UnboundedReadDeduplicator create() { + return new NeverDeduplicator(); +} + +@Override +public boolean shouldOutput(byte[] recordId) { + return true; +} + } + + + /** + * An {@link UnboundedReadDeduplicator} that returns true if the record ID has not been seen + * within 10 minutes. + */ + class CachedIdDeduplicator implements UnboundedReadDeduplicator { +private static final ByteArrayCoder RECORD_ID_CODER = ByteArrayCoder.of(); +private static final long MAX_RETENTION_SINCE_ACCESS = +Duration.standardMinutes(10L).getMillis(); + +private final LoadingCache , AtomicBoolean> ids; + +/** + * Create a new {@link CachedIdDeduplicator}. + */ +public static UnboundedReadDeduplicator create() { + return new CachedIdDeduplicator(); +} + +private CachedIdDeduplicator() { + ids = CacheBuilder.newBuilder() + .expireAfterAccess(MAX_RETENTION_SINCE_ACCESS, TimeUnit.MILLISECONDS) + .maximumSize(100_000L) + .build(new TrueBooleanLoader()); +} + +@Override +public boolean shouldOutput(byte[] recordId) { +
[GitHub] incubator-beam pull request #473: Deduplicate Unbounded Reads
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/473 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #473
Closes #473 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c83e5c48 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c83e5c48 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c83e5c48 Branch: refs/heads/master Commit: c83e5c48f002003f60e06a3dd41c83aa2a702a9a Parents: 3bc0fe6 569228e Author: Dan HalperinAuthored: Wed Jun 22 09:57:24 2016 -0700 Committer: Dan Halperin Committed: Wed Jun 22 09:57:24 2016 -0700 -- .../direct/UnboundedReadDeduplicator.java | 102 ++ .../direct/UnboundedReadEvaluatorFactory.java | 17 ++- .../direct/UnboundedReadDeduplicatorTest.java | 134 +++ .../UnboundedReadEvaluatorFactoryTest.java | 50 ++- 4 files changed, 299 insertions(+), 4 deletions(-) --
[jira] [Commented] (BEAM-354) Modify DatastoreIO to use Datastore v1beta3 API
[ https://issues.apache.org/jira/browse/BEAM-354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344733#comment-15344733 ] ASF GitHub Bot commented on BEAM-354: - Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/499 > Modify DatastoreIO to use Datastore v1beta3 API > --- > > Key: BEAM-354 > URL: https://issues.apache.org/jira/browse/BEAM-354 > Project: Beam > Issue Type: Improvement > Components: sdk-java-gcp >Reporter: Vikas Kedigehalli >Assignee: Vikas Kedigehalli > > Datastore v1beta2 API is getting deprecated in favor of v1beta3. Hence the > DatastoreIO needs to be migrated to use the new version. Also in the process > of doing so, this is a good time to add a level of indirection via a > PTranform such that future changes in Datastore API would not result in > changing user/pipeline code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[2/2] incubator-beam git commit: DatastoreIO: Update datastore API to v1beta3
DatastoreIO: Update datastore API to v1beta3 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/104f4dd2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/104f4dd2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/104f4dd2 Branch: refs/heads/master Commit: 104f4dd27dae9643f953d964b31a0d7674cbf1e9 Parents: f480944 Author: Vikas KedigehalliAuthored: Thu Jun 16 13:57:43 2016 -0700 Committer: Dan Halperin Committed: Wed Jun 22 09:54:51 2016 -0700 -- examples/java/pom.xml | 15 +- .../beam/examples/complete/AutoComplete.java| 35 +-- .../examples/cookbook/DatastoreWordCount.java | 48 ++-- pom.xml | 44 +-- runners/google-cloud-dataflow-java/pom.xml | 11 +- .../dataflow/io/DataflowDatastoreIOTest.java| 15 +- sdks/java/core/pom.xml | 9 +- .../org/apache/beam/sdk/coders/EntityCoder.java | 87 -- .../org/apache/beam/sdk/io/DatastoreIO.java | 286 --- .../apache/beam/sdk/coders/EntityCoderTest.java | 110 --- .../sdk/coders/protobuf/ProtobufUtilTest.java | 7 +- .../org/apache/beam/sdk/io/DatastoreIOTest.java | 171 +-- .../apache/beam/sdk/util/ApiSurfaceTest.java| 4 +- 13 files changed, 279 insertions(+), 563 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/examples/java/pom.xml -- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 5167810..cac9857 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -260,11 +260,6 @@ com.google.apis - google-api-services-datastore-protobuf - - - - com.google.apis google-api-services-pubsub @@ -279,6 +274,16 @@ + com.google.cloud.datastore + datastore-v1beta3-proto-client + + + + com.google.cloud.datastore + datastore-v1beta3-protos + + + joda-time joda-time http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java -- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index ef47762..c6893f4 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -17,6 +17,8 @@ */ package org.apache.beam.examples.complete; +import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue; + import org.apache.beam.examples.common.DataflowExampleUtils; import org.apache.beam.examples.common.ExampleBigQueryTableOptions; import org.apache.beam.examples.common.ExamplePubsubTopicOptions; @@ -55,18 +57,20 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.datastore.DatastoreV1.Entity; -import com.google.api.services.datastore.DatastoreV1.Key; -import com.google.api.services.datastore.DatastoreV1.Value; -import com.google.api.services.datastore.client.DatastoreHelper; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.datastore.v1beta3.Entity; +import com.google.datastore.v1beta3.Key; +import com.google.datastore.v1beta3.Value; +import com.google.datastore.v1beta3.client.DatastoreHelper; import org.joda.time.Duration; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -396,16 +400,15 @@ public class AutoComplete { entityBuilder.setKey(key); List candidates = new ArrayList<>(); + Map properties = new HashMap<>(); for (CompletionCandidate tag : c.element().getValue()) { Entity.Builder tagEntity = Entity.newBuilder(); -tagEntity.addProperty( -DatastoreHelper.makeProperty("tag", DatastoreHelper.makeValue(tag.value))); -tagEntity.addProperty( -DatastoreHelper.makeProperty("count", DatastoreHelper.makeValue(tag.count))); - candidates.add(DatastoreHelper.makeValue(tagEntity).setIndexed(false).build()); +properties.put("tag",
[GitHub] incubator-beam pull request #499: [BEAM-354]: Modify DatastoreIO to use the ...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/499 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Closes #499
Repository: incubator-beam Updated Branches: refs/heads/master f4809446b -> 3bc0fe669 Closes #499 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3bc0fe66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3bc0fe66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3bc0fe66 Branch: refs/heads/master Commit: 3bc0fe6699ccc3d499b195928bb56b6ba9939025 Parents: f480944 104f4dd Author: Dan HalperinAuthored: Wed Jun 22 09:54:51 2016 -0700 Committer: Dan Halperin Committed: Wed Jun 22 09:54:51 2016 -0700 -- examples/java/pom.xml | 15 +- .../beam/examples/complete/AutoComplete.java| 35 +-- .../examples/cookbook/DatastoreWordCount.java | 48 ++-- pom.xml | 44 +-- runners/google-cloud-dataflow-java/pom.xml | 11 +- .../dataflow/io/DataflowDatastoreIOTest.java| 15 +- sdks/java/core/pom.xml | 9 +- .../org/apache/beam/sdk/coders/EntityCoder.java | 87 -- .../org/apache/beam/sdk/io/DatastoreIO.java | 286 --- .../apache/beam/sdk/coders/EntityCoderTest.java | 110 --- .../sdk/coders/protobuf/ProtobufUtilTest.java | 7 +- .../org/apache/beam/sdk/io/DatastoreIOTest.java | 171 +-- .../apache/beam/sdk/util/ApiSurfaceTest.java| 4 +- 13 files changed, 279 insertions(+), 563 deletions(-) --
[jira] [Commented] (BEAM-33) Make DataflowAssert more window-aware
[ https://issues.apache.org/jira/browse/BEAM-33?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344715#comment-15344715 ] Thomas Groh commented on BEAM-33: - Effectively equivalent to https://issues.apache.org/jira/browse/BEAM-155 > Make DataflowAssert more window-aware > - > > Key: BEAM-33 > URL: https://issues.apache.org/jira/browse/BEAM-33 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Thomas Groh > Labels: Triggers, Windowing > > Today DataflowAssert rewindows into the global window before performing a > side input (as an implementation detail). This precludes support for other > windowing schemes and triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (BEAM-33) Make DataflowAssert more window-aware
[ https://issues.apache.org/jira/browse/BEAM-33?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Groh reassigned BEAM-33: --- Assignee: Thomas Groh > Make DataflowAssert more window-aware > - > > Key: BEAM-33 > URL: https://issues.apache.org/jira/browse/BEAM-33 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Kenneth Knowles >Assignee: Thomas Groh > Labels: Triggers, Windowing > > Today DataflowAssert rewindows into the global window before performing a > side input (as an implementation detail). This precludes support for other > windowing schemes and triggers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (BEAM-365) TextIO withoutSharding causes Flink to throw IllegalStateException
[ https://issues.apache.org/jira/browse/BEAM-365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15344157#comment-15344157 ] Maximilian Michels commented on BEAM-365: - Thanks for reporting! Would you like to work on this [~kkl0u]? > TextIO withoutSharding causes Flink to throw IllegalStateException > -- > > Key: BEAM-365 > URL: https://issues.apache.org/jira/browse/BEAM-365 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 0.2.0-incubating >Reporter: Pawel Szczur > > The exception: > {code}java.lang.IllegalStateException: Shard name template '' only generated > 1 distinct file names for 3 files{code} > The initial discussion took place some time ago, the {{withoutSharding}} was > then silently ignored by the runner. > Explanation from Aljoscha Krettek: > {quote} > Hi, > the issue is a bit more complicated and involves the Beam sink API and the > Flink runner. > I'll have to get a bit into how Beam sinks work. The base class for sinks > is Sink (TextIO.write gets translated to Write.to(new TextSink())). > Write.to normally gets translated to three ParDo operations that cooperate > to do the writing: > - "Initialize": this does initial initialization of the Sink, this is run > only once, per sink, non-parallel. > - "WriteBundles": this gets an initialized sink on a side-input and the > values to write on the main input. This runs in parallel, so for Flink, if > you set parallelism=6 you'll get 6 parallel instances of this operation at > runtime. This operation forwards information about where it writes to > downstream. This does not write to the final file location but an > intermediate staging location. > - "Finalize": This gets the initialized sink on the main-input and and the > information about written files from "WriteBundles" as a side-input. This > also only runs once, non-parallel. Here we're writing the intermediate > files to a final location based on the sharding template. > The problem is that Write.to() and TextSink, as well as all other sinks, > are not aware of the number of shards. If you set "withoutSharding()" this > will set the shard template to "" (empty string) and the number of shards > to 1. "WriteBundles", however is not aware of this and will write 6 > intermediate files if you set parallelism=6. In "Finalize" we will copy an > intermediate file to the same final location 6 times based on the sharding > template. The end result is that you only get one of the six result shards. > The reason why this does only occur in the Flink runner is that all other > runners have special overrides for TextIO.Write and AvroIO.Write that kick > in if sharding control is required. So, for the time being this is a Flink > runner bug and we might have to introduce special overrides as well until > this is solved in the general case. > Cheers, > Aljoscha > {quote} > Original discussion: > http://mail-archives.apache.org/mod_mbox/incubator-beam-user/201606.mbox/%3CCAMdX74-VPUsNOc9NKue2A2tYXZisnHNZ7UkPWk82_TFexpnySg%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] incubator-beam pull request #519: avoid to fail cause of \r of windows on EO...
GitHub user rmannibucau opened a pull request: https://github.com/apache/incubator-beam/pull/519 avoid to fail cause of \r of windows on EOL + fixing copyOne from Fil⦠Lighter version of https://github.com/apache/incubator-beam/pull/496 (basically removed the hadoop workaround) Build doesn't fully pass out of the box (= without setting up hadoop locally) but it doesn't fail for simple issues like EOL differences. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmannibucau/incubator-beam BEAM-357_path-handling-fails-on-windows Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/519.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #519 commit 2d3c9fee013c04fbad7013c3e334a5fae73aa53b Author: Romain manni-BucauDate: 2016-06-22T08:42:45Z avoid to fail cause of \r of windows on EOL + fixing copyOne from FileBasedSink which fails on windows hardrive syntax (colon) and avoid to have issues cause a folder can't be deleted in WriteSinkITCase - easily happen on windows cause of locking mecanism --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #453: Wait for Elements to be fetched in KafkaIO...
Github user asfgit closed the pull request at: https://github.com/apache/incubator-beam/pull/453 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Wait for Elements to be fetched in KafkaIO#start
Repository: incubator-beam Updated Branches: refs/heads/master 3ff98ea23 -> f4809446b Wait for Elements to be fetched in KafkaIO#start This makes it more likely that the reader has elements after the call to start returns. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15f69edf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15f69edf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15f69edf Branch: refs/heads/master Commit: 15f69edf80237152739a737e7e84c9ec933d372c Parents: 3ff98ea Author: Thomas GrohAuthored: Mon Jun 13 15:32:19 2016 -0700 Committer: Dan Halperin Committed: Tue Jun 21 23:28:02 2016 -0700 -- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15f69edf/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java -- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index d540a8d..3b64bd5 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -759,6 +759,8 @@ public class KafkaIO { private Iterator curBatch = Collections.emptyIterator(); private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000); +// how long to wait for new records from kafka consumer inside start() +private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5); // how long to wait for new records from kafka consumer inside advance() private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10); @@ -891,12 +893,12 @@ public class KafkaIO { LOG.info("{}: Returning from consumer pool loop", this); } -private void nextBatch() { +private void nextBatch(Duration timeout) { curBatch = Collections.emptyIterator(); ConsumerRecords records; try { -records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(), +records = availableRecordsQueue.poll(timeout.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -965,6 +967,9 @@ public class KafkaIO { } }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS); + // Wait for longer than normal when fetching a batch to improve chances a record is available + // when start() returns. + nextBatch(START_NEW_RECORDS_POLL_TIMEOUT); return advance(); } @@ -1028,7 +1033,7 @@ public class KafkaIO { return true; } else { // -- (b) - nextBatch(); + nextBatch(NEW_RECORDS_POLL_TIMEOUT); if (!curBatch.hasNext()) { return false;
[2/2] incubator-beam git commit: Closes #453
Closes #453 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f4809446 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f4809446 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f4809446 Branch: refs/heads/master Commit: f4809446b931c02e1dc5da0d86f01faf00b53581 Parents: 3ff98ea 15f69ed Author: Dan HalperinAuthored: Tue Jun 21 23:28:03 2016 -0700 Committer: Dan Halperin Committed: Tue Jun 21 23:28:03 2016 -0700 -- .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 11 --- 1 file changed, 8 insertions(+), 3 deletions(-) --
[GitHub] incubator-beam pull request #507: [BEAM-360] Implements a framework for deve...
Github user chamikaramj closed the pull request at: https://github.com/apache/incubator-beam/pull/507 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (BEAM-360) Add a framework for creating Python-SDK sources for new file types
[ https://issues.apache.org/jira/browse/BEAM-360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343782#comment-15343782 ] ASF GitHub Bot commented on BEAM-360: - Github user chamikaramj closed the pull request at: https://github.com/apache/incubator-beam/pull/507 > Add a framework for creating Python-SDK sources for new file types > -- > > Key: BEAM-360 > URL: https://issues.apache.org/jira/browse/BEAM-360 > Project: Beam > Issue Type: New Feature > Components: sdk-py >Reporter: Chamikara Jayalath >Assignee: Chamikara Jayalath > > We already have a framework for creating new sources for Beam Python SDK - > https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/iobase.py#L326 > It would be great if we can add a framework on top of this that encapsulates > logic common to sources that are based on files. This framework can include > following features that are common to sources based on files. > (1) glob expansion > (2) support for new file-systems > (3) dynamic work rebalancing based on byte offsets > (4) support for reading compressed files. > Java SDK has a similar framework and it's available at - > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java -- This message was sent by Atlassian JIRA (v6.3.4#6332)