[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393774 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 26/Feb/20 21:59 Start Date: 26/Feb/20 21:59 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384675886 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java ## @@ -175,6 +197,90 @@ public static Environment createProcessEnvironment( } } + public static Collection getArtifacts(PipelineOptions options) { +Set pathsToStage = Sets.newHashSet(); +// TODO(heejong): remove jar_packages experimental flag when cross-language dependency +// management is implemented for all runners. +List experiments = options.as(ExperimentalOptions.class).getExperiments(); +if (experiments != null) { + Optional jarPackages = + experiments.stream() + .filter((String flag) -> flag.startsWith("jar_packages=")) + .findFirst(); + jarPackages.ifPresent( + s -> pathsToStage.addAll(Arrays.asList(s.replaceFirst("jar_packages=", "").split(","; +} +List stagingFiles = options.as(PortablePipelineOptions.class).getFilesToStage(); +if (stagingFiles == null) { + pathsToStage.addAll( + detectClassPathResourcesToStage(Environments.class.getClassLoader(), options)); + if (pathsToStage.isEmpty()) { +throw new IllegalArgumentException("No classpath elements found."); + } + LOG.debug( + "PortablePipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: {}", + pathsToStage.size()); +} else { + pathsToStage.addAll(stagingFiles); +} + +ImmutableList.Builder filesToStage = ImmutableList.builder(); +for (String path : pathsToStage) { + File file = new File(path); + if (new File(path).exists()) { +// Spurious items get added to the classpath. Filter by just those that exist. +if (file.isDirectory()) { + // Zip up directories so we can upload them to the artifact service. Review comment: Is this the behavior expected by Beam runners (zipping up directories) ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393774) Time Spent: 3h (was: 2h 50m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8458) BigQueryIO.Read needs permissions to create datasets to be able to run queries
[ https://issues.apache.org/jira/browse/BEAM-8458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Israel Herraiz resolved BEAM-8458. -- Fix Version/s: Not applicable Resolution: Fixed > BigQueryIO.Read needs permissions to create datasets to be able to run queries > -- > > Key: BEAM-8458 > URL: https://issues.apache.org/jira/browse/BEAM-8458 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Israel Herraiz >Assignee: Israel Herraiz >Priority: Major > Fix For: Not applicable > > Time Spent: 4h 40m > Remaining Estimate: 0h > > When using {{fromQuery}}, BigQueryIO creates a temp dataset to store the > results of the query. > Therefore, Beam requires permissions to create datasets just to be able to > run a query. In practice, this means that Beam requires the role > bigQuery.User just to run queries, whereas if you use {{from}} (to read from > a table), the role bigQuery.jobUser suffices. > BigQueryIO.Read should have an option to set an existing dataset to write > the temp results of > a query, so it would be enough with having the role bigQuery.jobUser. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8989) Backwards incompatible change in ParDo.getSideInputs (caught by failure when running Apache Nemo quickstart)
[ https://issues.apache.org/jira/browse/BEAM-8989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045959#comment-17045959 ] Ismaël Mejía commented on BEAM-8989: This is not an error on Nemo runner. The mistake here is that we changed the signature of a public method in ParDo.java: From `public List> getSideInputs()` into `public Map> getSideInputs()` so we have two options: (1) re introduce the old signature (and rename the Map based one) or (2) add this information to the release notes in the backwards incompatible changes. Notice that this happened originally in release 2.16.0 but was never announced because it was discovered 'recently' thanks to the Nemo runner quickstart. I tend to prefer (2) even if it is less user friendly, in this case Nemo should be updated but since the runner is not in Beam's repo it is not blocking for us. > Backwards incompatible change in ParDo.getSideInputs (caught by failure when > running Apache Nemo quickstart) > > > Key: BEAM-8989 > URL: https://issues.apache.org/jira/browse/BEAM-8989 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Luke Cwik >Assignee: Reuven Lax >Priority: Critical > Fix For: 2.20.0 > > > [PR/9275|https://github.com/apache/beam/pull/9275] changed > *ParDo.getSideInputs* from *List* to *Map PCollectionView>* which is backwards incompatible change and was released as > part of Beam 2.16.0 erroneously. > Running the Apache Nemo Quickstart fails with: > > {code:java} > Exception in thread "main" java.lang.RuntimeException: Translator private > static void > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.parDoMultiOutputTranslator(org.apache.nemo.compiler.frontend.beam.PipelineTranslationContext,org.apache.beam.sdk.runners.TransformHierarchy$Node,org.apache.beam.sdk.transforms.ParDo$MultiOutput) > have failed to translate > org.apache.beam.examples.WordCount$ExtractWordsFn@600b9d27Exception in thread > "main" java.lang.RuntimeException: Translator private static void > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.parDoMultiOutputTranslator(org.apache.nemo.compiler.frontend.beam.PipelineTranslationContext,org.apache.beam.sdk.runners.TransformHierarchy$Node,org.apache.beam.sdk.transforms.ParDo$MultiOutput) > have failed to translate > org.apache.beam.examples.WordCount$ExtractWordsFn@600b9d27 at > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.translatePrimitive(PipelineTranslator.java:113) > at > org.apache.nemo.compiler.frontend.beam.PipelineVisitor.visitPrimitiveTransform(PipelineVisitor.java:46) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460) at > org.apache.nemo.compiler.frontend.beam.NemoRunner.run(NemoRunner.java:80) at > org.apache.nemo.compiler.frontend.beam.NemoRunner.run(NemoRunner.java:31) at > org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) at > org.apache.beam.sdk.Pipeline.run(Pipeline.java:301) at > org.apache.beam.examples.WordCount.runWordCount(WordCount.java:185) at > org.apache.beam.examples.WordCount.main(WordCount.java:192)Caused by: > java.lang.reflect.InvocationTargetException at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.translatePrimitive(PipelineTranslator.java:109) > ... 14 moreCaused by: java.lang.NoSuchMethodError: > org.apache.beam.sdk.transforms.ParDo$MultiOutput.getSideInputs()Ljava/util/List; > at > org.apache.nemo.compiler.frontend.beam.PipelineTranslator.parDoMultiOutputTranslator(PipelineTranslator.java:236) > ... 19 more{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8460) Portable Flink runner fails UsesStrictTimerOrdering category tests
[ https://issues.apache.org/jira/browse/BEAM-8460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver resolved BEAM-8460. --- Fix Version/s: Not applicable Resolution: Fixed > Portable Flink runner fails UsesStrictTimerOrdering category tests > -- > > Key: BEAM-8460 > URL: https://issues.apache.org/jira/browse/BEAM-8460 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.17.0 >Reporter: Jan Lukavský >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > BEAM-7520 introduced new set of validatesRunner tests that test that timers > are fired exactly in order of increasing timestamp. Portable Flink runner > fails these added tests (are currently ignored). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-8460) Portable Flink runner ignores UsesStrictTimerOrdering category tests
[ https://issues.apache.org/jira/browse/BEAM-8460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver updated BEAM-8460: -- Summary: Portable Flink runner ignores UsesStrictTimerOrdering category tests (was: Portable Flink runner fails UsesStrictTimerOrdering category tests) > Portable Flink runner ignores UsesStrictTimerOrdering category tests > > > Key: BEAM-8460 > URL: https://issues.apache.org/jira/browse/BEAM-8460 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.17.0 >Reporter: Jan Lukavský >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > BEAM-7520 introduced new set of validatesRunner tests that test that timers > are fired exactly in order of increasing timestamp. Portable Flink runner > fails these added tests (are currently ignored). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (BEAM-8460) Portable Flink runner fails UsesStrictTimerOrdering category tests
[ https://issues.apache.org/jira/browse/BEAM-8460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver reopened BEAM-8460: --- > Portable Flink runner fails UsesStrictTimerOrdering category tests > -- > > Key: BEAM-8460 > URL: https://issues.apache.org/jira/browse/BEAM-8460 > Project: Beam > Issue Type: Bug > Components: runner-flink >Affects Versions: 2.17.0 >Reporter: Jan Lukavský >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > Time Spent: 0.5h > Remaining Estimate: 0h > > BEAM-7520 introduced new set of validatesRunner tests that test that timers > are fired exactly in order of increasing timestamp. Portable Flink runner > fails these added tests (are currently ignored). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8591) Exception is thrown while running Beam Pipeline on Kubernetes Flink Cluster.
[ https://issues.apache.org/jira/browse/BEAM-8591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver resolved BEAM-8591. --- Fix Version/s: Not applicable Resolution: Not A Problem > Exception is thrown while running Beam Pipeline on Kubernetes Flink Cluster. > > > Key: BEAM-8591 > URL: https://issues.apache.org/jira/browse/BEAM-8591 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Mingliang Gong >Assignee: Kyle Weaver >Priority: Major > Fix For: Not applicable > > > h2. Setup Clusters > * Setup Local Flink Cluster: > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/tutorials/local_setup.html] > * Setup Kubernetes Flink Cluster using Minikube: > [https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/deployment/kubernetes.html] > h2. Verify Clusters > Execute command “./bin/flink run examples/streaming/WordCount.jar”. Both > Local and K8S Flink Cluster work fine. > h2. Using Apache Beam Flink Runner > Instruction: [https://beam.apache.org/documentation/runners/flink/] > Sample Pipeline Code: > {code:java} > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > options = PipelineOptions([ > "--runner=PortableRunner", > "--job_endpoint=localhost:8099", > "--environment_type=LOOPBACK" > ]) > with beam.Pipeline(options=options) as pipeline: > data = ["Sample data", > "Sample data - 0", > "Sample data - 1"] > raw_data = (pipeline > | 'CreateHardCodeData' >> beam.Create(data) > | 'Map' >> beam.Map(lambda line : line + '.') > | 'Print' >> beam.Map(print)){code} > Verify different environment_type in Python SDK Harness Configuration > *environment_type=LOOPBACK* > # Run pipeline on local cluster: Works Fine > # Run pipeline on K8S cluster, Exceptions are thrown: > java.lang.Exception: The user defined 'open()' method caused an exception: > org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: > UNAVAILABLE: io exception Caused by: > org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.AbstractChannel$AnnotatedConnectException: > Connection refused: localhost/127.0.0.1:51017 > *environment_type=DOCKER* > # Run pipeline on local cluster: Work fine > # Run pipeline on K8S cluster, Exception are thrown: > Caused by: java.io.IOException: Cannot run program "docker": error=2, No > such file or directory. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6441) cut_release_branch.sh should not push to master without verification and a PR
[ https://issues.apache.org/jira/browse/BEAM-6441?focusedWorklogId=393691=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393691 ] ASF GitHub Bot logged work on BEAM-6441: Author: ASF GitHub Bot Created on: 26/Feb/20 18:55 Start Date: 26/Feb/20 18:55 Worklog Time Spent: 10m Work Description: stale[bot] commented on issue #8223: [BEAM-6441] Revise release branch cut instructions URL: https://github.com/apache/beam/pull/8223#issuecomment-591585140 This pull request is no longer marked as stale. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393691) Time Spent: 3h (was: 2h 50m) > cut_release_branch.sh should not push to master without verification and a PR > - > > Key: BEAM-6441 > URL: https://issues.apache.org/jira/browse/BEAM-6441 > Project: Beam > Issue Type: Bug > Components: build-system >Reporter: Kenneth Knowles >Assignee: Kenneth Knowles >Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > Currently, the cut_release_branch.sh does many things: > - Edits files in place to update the version > - Makes a local commit > - Pushing the local commit to master > - Creates a new branch > - Edits files in place to update the version > - Pushes the release branch > I think all of this except the push to master are OK. It is possible that we > have something - website, examples, new places where the version is > hardcoded, etc, that get broken in this process. Moving from x-SNAPSHOT to > (x+1)-SNAPSHOT is easy to do in a pull request and safe. The release branch > creation does not need to be synchronized with this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-6441) cut_release_branch.sh should not push to master without verification and a PR
[ https://issues.apache.org/jira/browse/BEAM-6441?focusedWorklogId=393690=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393690 ] ASF GitHub Bot logged work on BEAM-6441: Author: ASF GitHub Bot Created on: 26/Feb/20 18:55 Start Date: 26/Feb/20 18:55 Worklog Time Spent: 10m Work Description: kennknowles commented on pull request #8223: [BEAM-6441] Revise release branch cut instructions URL: https://github.com/apache/beam/pull/8223 The current release branch cut script pushes to both the release branch and master without verification. The useful logic it encapsulates is simply the locations where we depend on the version. I have moved those into a `set_version.sh` script, and the remaining trivial steps are just git commands. Since you are currently doing the release, how does this sound? Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | --- | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393693 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 26/Feb/20 18:56 Start Date: 26/Feb/20 18:56 Worklog Time Spent: 10m Work Description: aaltay commented on pull request #10899: [BEAM-8335] Background Caching job URL: https://github.com/apache/beam/pull/10899 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393693) Time Spent: 74h (was: 73h 50m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 74h > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393710=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393710 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 26/Feb/20 19:11 Start Date: 26/Feb/20 19:11 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10892: [BEAM-8335] Make TestStream to/from runner_api include the output_tags property. URL: https://github.com/apache/beam/pull/10892#discussion_r384692255 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -133,15 +140,17 @@ def __eq__(self, other): return self.new_watermark == other.new_watermark and self.tag == other.tag def __hash__(self): -return hash(self.new_watermark) +return hash(str(self.new_watermark) + str(self.tag)) def __lt__(self, other): return self.new_watermark < other.new_watermark def to_runner_api(self, unused_element_coder): +tag = 'None' if self.tag is None else self.tag Review comment: This is a general problem with python tag naming where there is ambiguity. It was worked around for Dataflow by using `out` for `None` and `out_` for `tag` until it was removed in #10971 Unfortunately this name mangling was applied inconsistently throughout the codebase which lead to arbitrary fix-ups. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393710) Time Spent: 74h 20m (was: 74h 10m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 74h 20m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393709=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393709 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 26/Feb/20 19:11 Start Date: 26/Feb/20 19:11 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10892: [BEAM-8335] Make TestStream to/from runner_api include the output_tags property. URL: https://github.com/apache/beam/pull/10892#discussion_r384689184 ## File path: sdks/python/apache_beam/runners/direct/test_stream_impl.py ## @@ -45,11 +46,69 @@ class _WatermarkController(PTransform): - If the instance receives an ElementEvent, it emits all specified elements to the Global Window with the event time set to the element's timestamp. """ + def __init__(self, output_tag): +self.output_tag = output_tag + def get_windowing(self, _): return core.Windowing(window.GlobalWindows()) def expand(self, pcoll): -return pvalue.PCollection.from_(pcoll) +ret = pvalue.PCollection.from_(pcoll) +ret.tag = self.output_tag +return ret + + +class _ExpandableTestStream(PTransform): + def __init__(self, test_stream): +self.test_stream = test_stream + + def expand(self, pbegin): +"""Expands the TestStream into the DirectRunner implementation. + Review comment: ```suggestion ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393709) Time Spent: 74h 10m (was: 74h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 74h 10m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393713=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393713 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 26/Feb/20 19:11 Start Date: 26/Feb/20 19:11 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10892: [BEAM-8335] Make TestStream to/from runner_api include the output_tags property. URL: https://github.com/apache/beam/pull/10892#discussion_r384703870 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -171,13 +184,20 @@ class TestStream(PTransform): time. After all of the specified elements are emitted, ceases to produce Review comment: State that if only the default output or only one output tag has been specified, then a PCollection will be returned otherwise a dictionary of output name to PCollection. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393713) Time Spent: 74h 40m (was: 74.5h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 74h 40m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393712=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393712 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 26/Feb/20 19:11 Start Date: 26/Feb/20 19:11 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10892: [BEAM-8335] Make TestStream to/from runner_api include the output_tags property. URL: https://github.com/apache/beam/pull/10892#discussion_r384702356 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -171,13 +184,20 @@ class TestStream(PTransform): time. After all of the specified elements are emitted, ceases to produce output. """ - def __init__(self, coder=coders.FastPrimitivesCoder(), events=None): + def __init__( Review comment: Please add pydoc comments mentioning the few important pieces: 1) specifying the output_tags allows for adding outputs that produce no events 2) output_tags must be a superset of tags found in events if events is specified This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393712) Time Spent: 74h 40m (was: 74.5h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 74h 40m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393715=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393715 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 26/Feb/20 19:11 Start Date: 26/Feb/20 19:11 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10892: [BEAM-8335] Make TestStream to/from runner_api include the output_tags property. URL: https://github.com/apache/beam/pull/10892#discussion_r384699533 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -133,15 +140,17 @@ def __eq__(self, other): return self.new_watermark == other.new_watermark and self.tag == other.tag def __hash__(self): -return hash(self.new_watermark) +return hash(str(self.new_watermark) + str(self.tag)) def __lt__(self, other): return self.new_watermark < other.new_watermark def to_runner_api(self, unused_element_coder): +tag = 'None' if self.tag is None else self.tag return beam_runner_api_pb2.TestStreamPayload.Event( watermark_event=beam_runner_api_pb2.TestStreamPayload.Event. -AdvanceWatermark(new_watermark=self.new_watermark.micros // 1000)) +AdvanceWatermark( +new_watermark=self.new_watermark.micros // 1000, tag=tag)) Review comment: This should be done for ProcessingTimeEvent as well This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393715) Time Spent: 74h 50m (was: 74h 40m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 74h 50m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393711=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393711 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 26/Feb/20 19:11 Start Date: 26/Feb/20 19:11 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10892: [BEAM-8335] Make TestStream to/from runner_api include the output_tags property. URL: https://github.com/apache/beam/pull/10892#discussion_r384698347 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -133,15 +140,21 @@ def __eq__(self, other): return self.new_watermark == other.new_watermark and self.tag == other.tag def __hash__(self): -return hash(self.new_watermark) +return hash(str(self.new_watermark) + str(self.tag)) def __lt__(self, other): return self.new_watermark < other.new_watermark def to_runner_api(self, unused_element_coder): +tag = 'None' if self.tag is None else self.tag + +# Assert that no prevision is lost. Review comment: ```suggestion # Assert that no precision is lost. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393711) Time Spent: 74.5h (was: 74h 20m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 74.5h > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393714=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393714 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 26/Feb/20 19:11 Start Date: 26/Feb/20 19:11 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10892: [BEAM-8335] Make TestStream to/from runner_api include the output_tags property. URL: https://github.com/apache/beam/pull/10892#discussion_r384699002 ## File path: sdks/python/apache_beam/testing/test_stream.py ## @@ -133,15 +140,21 @@ def __eq__(self, other): return self.new_watermark == other.new_watermark and self.tag == other.tag def __hash__(self): -return hash(self.new_watermark) +return hash(str(self.new_watermark) + str(self.tag)) def __lt__(self, other): return self.new_watermark < other.new_watermark def to_runner_api(self, unused_element_coder): +tag = 'None' if self.tag is None else self.tag + +# Assert that no prevision is lost. +assert 1000 * ( +self.new_watermark.micros // 1000) == self.new_watermark.micros Review comment: nit: This would be clearer by checking `%` has no remainder This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393714) Time Spent: 74h 40m (was: 74.5h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 74h 40m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393742=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393742 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 26/Feb/20 20:52 Start Date: 26/Feb/20 20:52 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591637525 Yes, I will be thrilled to review this : ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393742) Time Spent: 1h 10m (was: 1h) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 1h 10m > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8575) Add more Python validates runner tests
[ https://issues.apache.org/jira/browse/BEAM-8575?focusedWorklogId=393758=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393758 ] ASF GitHub Bot logged work on BEAM-8575: Author: ASF GitHub Bot Created on: 26/Feb/20 21:24 Start Date: 26/Feb/20 21:24 Worklog Time Spent: 10m Work Description: bumblebee-coming commented on issue #10951: [BEAM-8575] Modified the test to work for different runners. URL: https://github.com/apache/beam/pull/10951#issuecomment-591651589 R: @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393758) Time Spent: 56h 40m (was: 56.5h) > Add more Python validates runner tests > -- > > Key: BEAM-8575 > URL: https://issues.apache.org/jira/browse/BEAM-8575 > Project: Beam > Issue Type: Test > Components: sdk-py-core, testing >Reporter: wendy liu >Assignee: wendy liu >Priority: Major > Time Spent: 56h 40m > Remaining Estimate: 0h > > This is the umbrella issue to track the work of adding more Python tests to > improve test coverage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8458) BigQueryIO.Read needs permissions to create datasets to be able to run queries
[ https://issues.apache.org/jira/browse/BEAM-8458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17045952#comment-17045952 ] Israel Herraiz commented on BEAM-8458: -- Pull request merged > BigQueryIO.Read needs permissions to create datasets to be able to run queries > -- > > Key: BEAM-8458 > URL: https://issues.apache.org/jira/browse/BEAM-8458 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Israel Herraiz >Assignee: Israel Herraiz >Priority: Major > Time Spent: 4h 40m > Remaining Estimate: 0h > > When using {{fromQuery}}, BigQueryIO creates a temp dataset to store the > results of the query. > Therefore, Beam requires permissions to create datasets just to be able to > run a query. In practice, this means that Beam requires the role > bigQuery.User just to run queries, whereas if you use {{from}} (to read from > a table), the role bigQuery.jobUser suffices. > BigQueryIO.Read should have an option to set an existing dataset to write > the temp results of > a query, so it would be enough with having the role bigQuery.jobUser. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8965) WriteToBigQuery failed in BundleBasedDirectRunner
[ https://issues.apache.org/jira/browse/BEAM-8965?focusedWorklogId=393841=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393841 ] ASF GitHub Bot logged work on BEAM-8965: Author: ASF GitHub Bot Created on: 26/Feb/20 23:35 Start Date: 26/Feb/20 23:35 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10901: [BEAM-8965] Remove duplicate sideinputs in ConsumerTrackingPipelineVisitor URL: https://github.com/apache/beam/pull/10901#issuecomment-591699495 I see. Thanks for pointing that out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393841) Time Spent: 2h 10m (was: 2h) > WriteToBigQuery failed in BundleBasedDirectRunner > - > > Key: BEAM-8965 > URL: https://issues.apache.org/jira/browse/BEAM-8965 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Wenbing Bai >Assignee: Wenbing Bai >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > *{{WriteToBigQuery}}* fails in *{{BundleBasedDirectRunner}}* with error > {{PCollection of size 2 with more than one element accessed as a singleton > view.}} > Here is the code > > {code:python} > with Pipeline() as p: > query_results = ( > p > | beam.io.Read(beam.io.BigQuerySource( > query='SELECT ... FROM ...') > ) > query_results | beam.io.gcp.WriteToBigQuery( > table=, > method=WriteToBigQuery.Method.FILE_LOADS, > schema={"fields": []} > ) > {code} > > Here is the error > > {code:none} > File "apache_beam/runners/common.py", line 778, in > apache_beam.runners.common.DoFnRunner.process > def process(self, windowed_value): > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_process_per_window( > File "apache_beam/runners/common.py", line 610, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > [si[global_window] for si in self.side_inputs])) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/transforms/sideinputs.py", > line 65, in __getitem__ > _FilteringIterable(self._iterable, target_window), self._view_options) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/pvalue.py", > line 443, in _from_runtime_iterable > len(head), str(head[0]), str(head[1]))) > ValueError: PCollection of size 2 with more than one element accessed as a > singleton view. First two elements encountered are > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f", > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f". [while running > 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)'] > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393864=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393864 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 00:14 Start Date: 27/Feb/20 00:14 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591710321 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393864) Time Spent: 1h 40m (was: 1.5h) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 1h 40m > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8435) Allow access to PaneInfo from Python DoFns
[ https://issues.apache.org/jira/browse/BEAM-8435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Bradshaw resolved BEAM-8435. --- Fix Version/s: 2.19.0 Resolution: Duplicate > Allow access to PaneInfo from Python DoFns > -- > > Key: BEAM-8435 > URL: https://issues.apache.org/jira/browse/BEAM-8435 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Robert Bradshaw >Assignee: Robert Bradshaw >Priority: Major > Fix For: 2.19.0 > > Time Spent: 2h > Remaining Estimate: 0h > > PaneInfoParam exists, but the plumbing to actually populate it at runtime was > never added. (Nor, clearly, were any tests...) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8575) Add more Python validates runner tests
[ https://issues.apache.org/jira/browse/BEAM-8575?focusedWorklogId=393867=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393867 ] ASF GitHub Bot logged work on BEAM-8575: Author: ASF GitHub Bot Created on: 27/Feb/20 00:26 Start Date: 27/Feb/20 00:26 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10951: [BEAM-8575] Modified the test to work for different runners. URL: https://github.com/apache/beam/pull/10951#discussion_r384848033 ## File path: sdks/python/apache_beam/transforms/combiners_test.py ## @@ -470,6 +470,9 @@ def test_combining_with_accumulation_mode_and_fanout(self): ts.add_elements([i]) ts.advance_watermark_to_infinity() +def is_early_firing(element, num_partitions): + return 0 if element < 15 else 1 Review comment: This was fixed last year. Looks like the bug was never closed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393867) Time Spent: 57h 20m (was: 57h 10m) > Add more Python validates runner tests > -- > > Key: BEAM-8575 > URL: https://issues.apache.org/jira/browse/BEAM-8575 > Project: Beam > Issue Type: Test > Components: sdk-py-core, testing >Reporter: wendy liu >Assignee: wendy liu >Priority: Major > Time Spent: 57h 20m > Remaining Estimate: 0h > > This is the umbrella issue to track the work of adding more Python tests to > improve test coverage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-3759) Add support for PaneInfo descriptor in Python SDK
[ https://issues.apache.org/jira/browse/BEAM-3759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Bradshaw resolved BEAM-3759. --- Fix Version/s: 2.19.0 Resolution: Fixed > Add support for PaneInfo descriptor in Python SDK > - > > Key: BEAM-3759 > URL: https://issues.apache.org/jira/browse/BEAM-3759 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Affects Versions: 2.3.0 >Reporter: Charles Chen >Assignee: Tanay Tummalapalli >Priority: Major > Fix For: 2.19.0 > > Time Spent: 3h 40m > Remaining Estimate: 0h > > The PaneInfo descriptor allows a user to determine which particular > triggering emitted a value. This allows the user to differentiate between > speculative (early), on-time (at end of window) and late value emissions > coming out of a GroupByKey. We should add support for this feature in the > Python SDK. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9381) Add display data to BoundedSourceSDF
[ https://issues.apache.org/jira/browse/BEAM-9381?focusedWorklogId=393892=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393892 ] ASF GitHub Bot logged work on BEAM-9381: Author: ASF GitHub Bot Created on: 27/Feb/20 01:34 Start Date: 27/Feb/20 01:34 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10968: [BEAM-9381] Adding display data to BoundedSource SDF URL: https://github.com/apache/beam/pull/10968#issuecomment-591731834 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393892) Time Spent: 1.5h (was: 1h 20m) > Add display data to BoundedSourceSDF > - > > Key: BEAM-9381 > URL: https://issues.apache.org/jira/browse/BEAM-9381 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393893=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393893 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 01:34 Start Date: 27/Feb/20 01:34 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591731930 Run Python 3.7 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393893) Time Spent: 2h (was: 1h 50m) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 2h > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046048#comment-17046048 ] Valentyn Tymofieiev commented on BEAM-8494: --- There is a discussion relevant to this issue[1]. [1] https://lists.apache.org/thread.html/rd070afcebff5c967ec3b25d1f7a77db5278992c1508082bf5f636acd%40%3Cdev.beam.apache.org%3E > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393934=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393934 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:14 Start Date: 27/Feb/20 03:14 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892436 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) + if depth > 0: +columns = ['[{}]'.format(f) for f in fields] + elements = [el._asdict()[f] for f in fields] +else: + elements = [el] + return columns, elements + + +def parse_row(el, element_type, include_window_info=True, prefix=COLUMN_PREFIX): + # Reify the WindowedValue data to the Dataframe if asked. + windowed = None + if isinstance(el, WindowedValue): +if include_window_info: + windowed = el +el = el.value + + # Parse the elements with the given type. + columns, elements = parse_row_(el, element_type, 0) + + # If there are no columns returned, there is only a single column of a + # primitive data type. + if not columns: +columns = [''] + + # Add the prefix to the columns that have an index. + for i in range(len(columns)): +if columns[i] == '' or columns[i][0] == '[': + columns[i] = prefix + columns[i] + + # Reify the windowed columns and do a best-effort casting into Pandas DTypes. + if windowed: +columns += ['event_time', 'windows', 'pane_info'] +elements += [ +windowed.timestamp.micros, windowed.windows, windowed.pane_info +] + return columns, elements + + +def pcoll_to_df( +elements, element_type, include_window_info=False, prefix=COLUMN_PREFIX): + """Parses the given elements into a Dataframe. + + Each column name will be prefixed with `prefix` concatenated with the nested + index, e.g. for a Tuple[Tuple[int, str], int], the column names will be: + [prefix[0][0], prefix[0][1], prefix[0]]. This is subject to change. + """ + rows = [] + columns = [] + + for e in elements: +columns, row = parse_row(e, element_type, include_window_info, prefix) Review comment: Changed to inferring the schema once. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393934) Time Spent: 76h 20m (was: 76h 10m) > Add streaming
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393933=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393933 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:14 Start Date: 27/Feb/20 03:14 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892377 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) + if depth > 0: +columns = ['[{}]'.format(f) for f in fields] + elements = [el._asdict()[f] for f in fields] +else: + elements = [el] + return columns, elements + + +def parse_row(el, element_type, include_window_info=True, prefix=COLUMN_PREFIX): + # Reify the WindowedValue data to the Dataframe if asked. + windowed = None + if isinstance(el, WindowedValue): +if include_window_info: + windowed = el +el = el.value + + # Parse the elements with the given type. + columns, elements = parse_row_(el, element_type, 0) + + # If there are no columns returned, there is only a single column of a + # primitive data type. + if not columns: +columns = [''] + + # Add the prefix to the columns that have an index. + for i in range(len(columns)): +if columns[i] == '' or columns[i][0] == '[': Review comment: This code was removed in a recent commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393933) Time Spent: 76h 10m (was: 76h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 76h 10m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393937=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393937 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:17 Start Date: 27/Feb/20 03:17 Worklog Time Spent: 10m Work Description: rohdesamuel commented on issue #10497: [BEAM-8335] Add the ReverseTestStream URL: https://github.com/apache/beam/pull/10497#issuecomment-591758520 Hey Robert, I think this is good to review. Please review for content, I haven't had the time to add better comments yet (also still needs formatting). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393937) Time Spent: 76h 40m (was: 76.5h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 76h 40m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8965) WriteToBigQuery failed in BundleBasedDirectRunner
[ https://issues.apache.org/jira/browse/BEAM-8965?focusedWorklogId=393842=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393842 ] ASF GitHub Bot logged work on BEAM-8965: Author: ASF GitHub Bot Created on: 26/Feb/20 23:36 Start Date: 26/Feb/20 23:36 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #10901: [BEAM-8965] Remove duplicate sideinputs in ConsumerTrackingPipelineVisitor URL: https://github.com/apache/beam/pull/10901 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393842) Time Spent: 2h 20m (was: 2h 10m) > WriteToBigQuery failed in BundleBasedDirectRunner > - > > Key: BEAM-8965 > URL: https://issues.apache.org/jira/browse/BEAM-8965 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Wenbing Bai >Assignee: Wenbing Bai >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > *{{WriteToBigQuery}}* fails in *{{BundleBasedDirectRunner}}* with error > {{PCollection of size 2 with more than one element accessed as a singleton > view.}} > Here is the code > > {code:python} > with Pipeline() as p: > query_results = ( > p > | beam.io.Read(beam.io.BigQuerySource( > query='SELECT ... FROM ...') > ) > query_results | beam.io.gcp.WriteToBigQuery( > table=, > method=WriteToBigQuery.Method.FILE_LOADS, > schema={"fields": []} > ) > {code} > > Here is the error > > {code:none} > File "apache_beam/runners/common.py", line 778, in > apache_beam.runners.common.DoFnRunner.process > def process(self, windowed_value): > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_process_per_window( > File "apache_beam/runners/common.py", line 610, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > [si[global_window] for si in self.side_inputs])) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/transforms/sideinputs.py", > line 65, in __getitem__ > _FilteringIterable(self._iterable, target_window), self._view_options) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/pvalue.py", > line 443, in _from_runtime_iterable > len(head), str(head[0]), str(head[1]))) > ValueError: PCollection of size 2 with more than one element accessed as a > singleton view. First two elements encountered are > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f", > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f". [while running > 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)'] > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393932=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393932 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:13 Start Date: 27/Feb/20 03:13 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892342 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) + if depth > 0: +columns = ['[{}]'.format(f) for f in fields] + elements = [el._asdict()[f] for f in fields] +else: + elements = [el] + return columns, elements + + +def parse_row(el, element_type, include_window_info=True, prefix=COLUMN_PREFIX): + # Reify the WindowedValue data to the Dataframe if asked. + windowed = None + if isinstance(el, WindowedValue): +if include_window_info: + windowed = el +el = el.value + + # Parse the elements with the given type. + columns, elements = parse_row_(el, element_type, 0) + + # If there are no columns returned, there is only a single column of a + # primitive data type. + if not columns: +columns = [''] + + # Add the prefix to the columns that have an index. + for i in range(len(columns)): +if columns[i] == '' or columns[i][0] == '[': + columns[i] = prefix + columns[i] + + # Reify the windowed columns and do a best-effort casting into Pandas DTypes. + if windowed: +columns += ['event_time', 'windows', 'pane_info'] +elements += [ +windowed.timestamp.micros, windowed.windows, windowed.pane_info +] + return columns, elements + + +def pcoll_to_df( +elements, element_type, include_window_info=False, prefix=COLUMN_PREFIX): + """Parses the given elements into a Dataframe. + + Each column name will be prefixed with `prefix` concatenated with the nested + index, e.g. for a Tuple[Tuple[int, str], int], the column names will be: + [prefix[0][0], prefix[0][1], prefix[0]]. This is subject to change. Review comment: ack, changed to dots (much simpler) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393932) Time Spent: 76h (was: 75h 50m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL:
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393930=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393930 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:13 Start Date: 27/Feb/20 03:13 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892102 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) Review comment: Unfortunately this is the way to check if it is a named tuple. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393930) Time Spent: 75h 40m (was: 75.5h) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 75h 40m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393931=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393931 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:13 Start Date: 27/Feb/20 03:13 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384892289 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): + elements = [] + columns = [] + + # Recurse if there are a known length of columns to parse into. + if isinstance(element_type, (th.TupleHint.TupleConstraint)): +for index, t in enumerate(element_type._inner_types()): + underlying_columns, underlying_elements = parse_row_(el[index], t, + depth + 1) + column = '[{}]'.format(index) + if underlying_columns: +columns += [column + c for c in underlying_columns] + else: +columns += [column] + elements += underlying_elements + + # Don't make new columns for variable length types. + elif isinstance( + element_type, + (th.ListHint.ListConstraint, th.TupleHint.TupleSequenceConstraint)): +elements = [pd.array(el)] + + # For any other types, try to parse as a namedtuple, otherwise pass element + # through. + else: +fields = getattr(el, '_fields', None) +if fields: + columns = list(fields) + if depth > 0: +columns = ['[{}]'.format(f) for f in fields] + elements = [el._asdict()[f] for f in fields] Review comment: This code was removed in a recent commit, but ack on getattr being more natural This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393931) Time Spent: 75h 50m (was: 75h 40m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 75h 50m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393927=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393927 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:12 Start Date: 27/Feb/20 03:12 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384891918 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' + + +def parse_row_(el, element_type, depth): Review comment: Ack, I changed the name to just parse_row (because I use it in tests). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393927) Time Spent: 75h 20m (was: 75h 10m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 75h 20m > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8335) Add streaming support to Interactive Beam
[ https://issues.apache.org/jira/browse/BEAM-8335?focusedWorklogId=393928=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393928 ] ASF GitHub Bot logged work on BEAM-8335: Author: ASF GitHub Bot Created on: 27/Feb/20 03:12 Start Date: 27/Feb/20 03:12 Worklog Time Spent: 10m Work Description: rohdesamuel commented on pull request #10915: [BEAM-8335] Add PCollection to DataFrame logic for InteractiveRunner. URL: https://github.com/apache/beam/pull/10915#discussion_r384891956 ## File path: sdks/python/apache_beam/runners/interactive/utils.py ## @@ -0,0 +1,112 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities to be used in Interactive Beam. +""" + +from __future__ import absolute_import + +import pandas as pd + +from apache_beam.typehints import typehints as th +from apache_beam.utils.windowed_value import WindowedValue + +COLUMN_PREFIX = 'el' Review comment: Ack, changed to elt This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393928) Time Spent: 75.5h (was: 75h 20m) > Add streaming support to Interactive Beam > - > > Key: BEAM-8335 > URL: https://issues.apache.org/jira/browse/BEAM-8335 > Project: Beam > Issue Type: Improvement > Components: runner-py-interactive >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Time Spent: 75.5h > Remaining Estimate: 0h > > This issue tracks the work items to introduce streaming support to the > Interactive Beam experience. This will allow users to: > * Write and run a streaming job in IPython > * Automatically cache records from unbounded sources > * Add a replay experience that replays all cached records to simulate the > original pipeline execution > * Add controls to play/pause/stop/step individual elements from the cached > records > * Add ability to inspect/visualize unbounded PCollections -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9258) [Python] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9258?focusedWorklogId=394007=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394007 ] ASF GitHub Bot logged work on BEAM-9258: Author: ASF GitHub Bot Created on: 27/Feb/20 07:49 Start Date: 27/Feb/20 07:49 Worklog Time Spent: 10m Work Description: mwalenia commented on issue #10961: [BEAM-9258] Add integration test for Cloud DLP URL: https://github.com/apache/beam/pull/10961#issuecomment-591830855 R: @aaltay cc: @kamilwu This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394007) Time Spent: 3h 50m (was: 3h 40m) > [Python] PTransform that connects to Cloud DLP deidentification service > --- > > Key: BEAM-9258 > URL: https://issues.apache.org/jira/browse/BEAM-9258 > Project: Beam > Issue Type: Sub-task > Components: io-py-gcp >Reporter: Michał Walenia >Assignee: Michał Walenia >Priority: Major > Fix For: 2.20.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9392) TestStream tests are all flaky
[ https://issues.apache.org/jira/browse/BEAM-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046227#comment-17046227 ] Pablo Estrada commented on BEAM-9392: - [~rohdesam] can you take a look please? > TestStream tests are all flaky > -- > > Key: BEAM-9392 > URL: https://issues.apache.org/jira/browse/BEAM-9392 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Pablo Estrada >Assignee: Sam Rohde >Priority: Major > > See: > [https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9392) TestStream tests are all flaky
[ https://issues.apache.org/jira/browse/BEAM-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pablo Estrada updated BEAM-9392: Status: Open (was: Triage Needed) > TestStream tests are all flaky > -- > > Key: BEAM-9392 > URL: https://issues.apache.org/jira/browse/BEAM-9392 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Pablo Estrada >Priority: Major > > See: > [https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9381) Add display data to BoundedSourceSDF
[ https://issues.apache.org/jira/browse/BEAM-9381?focusedWorklogId=393987=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393987 ] ASF GitHub Bot logged work on BEAM-9381: Author: ASF GitHub Bot Created on: 27/Feb/20 06:40 Start Date: 27/Feb/20 06:40 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10968: [BEAM-9381] Adding display data to BoundedSource SDF URL: https://github.com/apache/beam/pull/10968#issuecomment-591808874 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393987) Time Spent: 1h 40m (was: 1.5h) > Add display data to BoundedSourceSDF > - > > Key: BEAM-9381 > URL: https://issues.apache.org/jira/browse/BEAM-9381 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9392) TestStream tests are all flaky
Pablo Estrada created BEAM-9392: --- Summary: TestStream tests are all flaky Key: BEAM-9392 URL: https://issues.apache.org/jira/browse/BEAM-9392 Project: Beam Issue Type: Bug Components: test-failures Reporter: Pablo Estrada See: [https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9394) DynamicMessage handling of empty map violates schema nullability
Alex Van Boxel created BEAM-9394: Summary: DynamicMessage handling of empty map violates schema nullability Key: BEAM-9394 URL: https://issues.apache.org/jira/browse/BEAM-9394 Project: Beam Issue Type: Bug Components: extensions-java-protobuf Reporter: Alex Van Boxel Assignee: Alex Van Boxel Fix For: 2.20.0 DynamicMessage handling of empty map violates nullability. It should return an empty map at the Row level. Add tests for nullable map and array to verify behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9394) DynamicMessage handling of empty map violates schema nullability
[ https://issues.apache.org/jira/browse/BEAM-9394?focusedWorklogId=394004=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394004 ] ASF GitHub Bot logged work on BEAM-9394: Author: ASF GitHub Bot Created on: 27/Feb/20 07:47 Start Date: 27/Feb/20 07:47 Worklog Time Spent: 10m Work Description: alexvanboxel commented on pull request #10984: [BEAM-9394] DynamicMessage handling of empty map violates schema null… URL: https://github.com/apache/beam/pull/10984 DynamicMessage handling of empty map violates schema nullability Fixed the handling of empty maps. It runned NULL, but should return and emtpy map in the Row. Added tests for Maps and Array. Only Map had the incorrect behaviour. **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
[jira] [Work logged] (BEAM-9295) Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=394005=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394005 ] ASF GitHub Bot logged work on BEAM-9295: Author: ASF GitHub Bot Created on: 27/Feb/20 07:47 Start Date: 27/Feb/20 07:47 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10945: [BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 URL: https://github.com/apache/beam/pull/10945#issuecomment-591830495 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394005) Time Spent: 2h 10m (was: 2h) > Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 > --- > > Key: BEAM-9295 > URL: https://issues.apache.org/jira/browse/BEAM-9295 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 2h 10m > Remaining Estimate: 0h > > Apache Flink 1.10 has completed the final release vote, see [1]. So, I would > like to add Flink 1.10 build target and make Flink Runner compatible with > Flink 1.10. > And I appreciate it if you can leave your suggestions or comments! > [1] > https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9395) Support Complex Types when converting HCatRecords to Rows
[ https://issues.apache.org/jira/browse/BEAM-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rahul Patwari updated BEAM-9395: Description: org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema() only supports converting Primitive types of HCatRecords to Rows. It can be enhanced to support complex types i.e. List, Map, Struct. > Support Complex Types when converting HCatRecords to Rows > - > > Key: BEAM-9395 > URL: https://issues.apache.org/jira/browse/BEAM-9395 > Project: Beam > Issue Type: Improvement > Components: io-java-hcatalog >Reporter: Rahul Patwari >Priority: Major > > org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema() only supports > converting Primitive types of HCatRecords to Rows. It can be enhanced to > support complex types i.e. List, Map, Struct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9295) Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=394006=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394006 ] ASF GitHub Bot logged work on BEAM-9295: Author: ASF GitHub Bot Created on: 27/Feb/20 07:48 Start Date: 27/Feb/20 07:48 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10945: [BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 URL: https://github.com/apache/beam/pull/10945#issuecomment-591809987 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394006) Time Spent: 2h 20m (was: 2h 10m) > Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 > --- > > Key: BEAM-9295 > URL: https://issues.apache.org/jira/browse/BEAM-9295 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 2h 20m > Remaining Estimate: 0h > > Apache Flink 1.10 has completed the final release vote, see [1]. So, I would > like to add Flink 1.10 build target and make Flink Runner compatible with > Flink 1.10. > And I appreciate it if you can leave your suggestions or comments! > [1] > https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9384) Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types
[ https://issues.apache.org/jira/browse/BEAM-9384?focusedWorklogId=394012=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394012 ] ASF GitHub Bot logged work on BEAM-9384: Author: ASF GitHub Bot Created on: 27/Feb/20 07:56 Start Date: 27/Feb/20 07:56 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10974: [BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types URL: https://github.com/apache/beam/pull/10974#issuecomment-591833469 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394012) Time Spent: 1h 10m (was: 1h) > Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types > -- > > Key: BEAM-9384 > URL: https://issues.apache.org/jira/browse/BEAM-9384 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > Time Spent: 1h 10m > Remaining Estimate: 0h > > PTransforms that are parameterized by types may be able to infer a valid > SchemaCoder for a given type from the SchemaRegistry (if a Schema for the > given type is available). This method will provide a unified place to do that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9384) Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types
[ https://issues.apache.org/jira/browse/BEAM-9384?focusedWorklogId=394013=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394013 ] ASF GitHub Bot logged work on BEAM-9384: Author: ASF GitHub Bot Created on: 27/Feb/20 07:56 Start Date: 27/Feb/20 07:56 Worklog Time Spent: 10m Work Description: iemejia commented on issue #10974: [BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types URL: https://github.com/apache/beam/pull/10974#issuecomment-591833541 Run JavaPortabilityApi PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394013) Time Spent: 1h 20m (was: 1h 10m) > Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types > -- > > Key: BEAM-9384 > URL: https://issues.apache.org/jira/browse/BEAM-9384 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > Time Spent: 1h 20m > Remaining Estimate: 0h > > PTransforms that are parameterized by types may be able to infer a valid > SchemaCoder for a given type from the SchemaRegistry (if a Schema for the > given type is available). This method will provide a unified place to do that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9395) Support Complex Types when converting HCatRecords to Rows
Rahul Patwari created BEAM-9395: --- Summary: Support Complex Types when converting HCatRecords to Rows Key: BEAM-9395 URL: https://issues.apache.org/jira/browse/BEAM-9395 Project: Beam Issue Type: Improvement Components: io-java-hcatalog Reporter: Rahul Patwari -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9393) support schemas in state API
[ https://issues.apache.org/jira/browse/BEAM-9393?focusedWorklogId=393990=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393990 ] ASF GitHub Bot logged work on BEAM-9393: Author: ASF GitHub Bot Created on: 27/Feb/20 06:55 Start Date: 27/Feb/20 06:55 Worklog Time Spent: 10m Work Description: reuvenlax commented on pull request #10983: [BEAM-9393] Support schemas in state API URL: https://github.com/apache/beam/pull/10983 Add schema inference for types used in the state API. Add state overrides for Row types. Disable Coder inference for Row types, as we should only use Row with schemas. R: @dpmills This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393990) Remaining Estimate: 0h Time Spent: 10m > support schemas in state API > > > Key: BEAM-9393 > URL: https://issues.apache.org/jira/browse/BEAM-9393 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core >Reporter: Reuven Lax >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9393) support schemas in state API
Reuven Lax created BEAM-9393: Summary: support schemas in state API Key: BEAM-9393 URL: https://issues.apache.org/jira/browse/BEAM-9393 Project: Beam Issue Type: Sub-task Components: sdk-java-core Reporter: Reuven Lax -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9394) DynamicMessage handling of empty map violates schema nullability
[ https://issues.apache.org/jira/browse/BEAM-9394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Van Boxel resolved BEAM-9394. -- Resolution: Fixed > DynamicMessage handling of empty map violates schema nullability > > > Key: BEAM-9394 > URL: https://issues.apache.org/jira/browse/BEAM-9394 > Project: Beam > Issue Type: Bug > Components: extensions-java-protobuf >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Major > Fix For: 2.20.0 > > > DynamicMessage handling of empty map violates nullability. It should return > an empty map at the Row level. > Add tests for nullable map and array to verify behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9394) DynamicMessage handling of empty map violates schema nullability
[ https://issues.apache.org/jira/browse/BEAM-9394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Van Boxel updated BEAM-9394: - Status: Open (was: Triage Needed) > DynamicMessage handling of empty map violates schema nullability > > > Key: BEAM-9394 > URL: https://issues.apache.org/jira/browse/BEAM-9394 > Project: Beam > Issue Type: Bug > Components: extensions-java-protobuf >Reporter: Alex Van Boxel >Assignee: Alex Van Boxel >Priority: Major > Fix For: 2.20.0 > > > DynamicMessage handling of empty map violates nullability. It should return > an empty map at the Row level. > Add tests for nullable map and array to verify behaviour. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9395) Support Complex Types when converting HCatRecords to Rows
[ https://issues.apache.org/jira/browse/BEAM-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rahul Patwari updated BEAM-9395: Priority: Minor (was: Major) > Support Complex Types when converting HCatRecords to Rows > - > > Key: BEAM-9395 > URL: https://issues.apache.org/jira/browse/BEAM-9395 > Project: Beam > Issue Type: Improvement > Components: io-java-hcatalog >Reporter: Rahul Patwari >Priority: Minor > > org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema() only supports > converting Primitive types of HCatRecords to Rows. It can be enhanced to > support complex types i.e. List, Map, Struct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9384) Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types
[ https://issues.apache.org/jira/browse/BEAM-9384?focusedWorklogId=394008=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-394008 ] ASF GitHub Bot logged work on BEAM-9384: Author: ASF GitHub Bot Created on: 27/Feb/20 07:52 Start Date: 27/Feb/20 07:52 Worklog Time Spent: 10m Work Description: iemejia commented on pull request #10974: [BEAM-9384] Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types URL: https://github.com/apache/beam/pull/10974#discussion_r384960299 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/SchemaRegistry.java ## @@ -288,6 +281,38 @@ public void registerSchemaProvider(SchemaProvider schemaProvider) { return getProviderResult((SchemaProvider p) -> p.fromRowFunction(typeDescriptor)); } + /** + * Retrieve a {@link SchemaCoder} for a given {@link Class} type. If no schema exists, throws + * {@link * NoSuchSchemaException}. + */ + public SchemaCoder getSchemaCoder(Class clazz) throws NoSuchSchemaException { +return getSchemaCoder(TypeDescriptor.of(clazz)); + } + + /** + * Retrieve a {@link SchemaCoder} for a given {@link TypeDescriptor} type. If no schema exists, + * throws {@link * NoSuchSchemaException}. + */ + public SchemaCoder getSchemaCoder(TypeDescriptor typeDescriptor) + throws NoSuchSchemaException { +return SchemaCoder.of( +getSchema(typeDescriptor), +typeDescriptor, +getToRowFunction(typeDescriptor), +getFromRowFunction(typeDescriptor)); Review comment: I agree with you in the fact that `SchemaCoder` is an internal detail that regular users (authors of Pipelines) should not care about. I felt tempted to mark this method as `@Internal` however PTransform authors (e.g. IO authors) will find this useful (as I did for the PR I mention above for KafkaIO schema support), so probably worth to let it available, also I cannot think of a better place to put this method than here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 394008) Time Spent: 1h (was: 50m) > Add SchemaRegistry.getSchemaCoder to get SchemaCoders for registered types > -- > > Key: BEAM-9384 > URL: https://issues.apache.org/jira/browse/BEAM-9384 > Project: Beam > Issue Type: Improvement > Components: sdk-java-core >Reporter: Ismaël Mejía >Assignee: Ismaël Mejía >Priority: Minor > Time Spent: 1h > Remaining Estimate: 0h > > PTransforms that are parameterized by types may be able to infer a valid > SchemaCoder for a given type from the SchemaRegistry (if a Schema for the > given type is available). This method will provide a unified place to do that. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-9392) TestStream tests are all flaky
[ https://issues.apache.org/jira/browse/BEAM-9392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pablo Estrada reassigned BEAM-9392: --- Assignee: Sam Rohde > TestStream tests are all flaky > -- > > Key: BEAM-9392 > URL: https://issues.apache.org/jira/browse/BEAM-9392 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Pablo Estrada >Assignee: Sam Rohde >Priority: Major > > See: > [https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9295) Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=393988=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393988 ] ASF GitHub Bot logged work on BEAM-9295: Author: ASF GitHub Bot Created on: 27/Feb/20 06:44 Start Date: 27/Feb/20 06:44 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10945: [BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 URL: https://github.com/apache/beam/pull/10945#issuecomment-591809987 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393988) Time Spent: 1h 50m (was: 1h 40m) > Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 > --- > > Key: BEAM-9295 > URL: https://issues.apache.org/jira/browse/BEAM-9295 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Apache Flink 1.10 has completed the final release vote, see [1]. So, I would > like to add Flink 1.10 build target and make Flink Runner compatible with > Flink 1.10. > And I appreciate it if you can leave your suggestions or comments! > [1] > https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9295) Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10
[ https://issues.apache.org/jira/browse/BEAM-9295?focusedWorklogId=393989=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393989 ] ASF GitHub Bot logged work on BEAM-9295: Author: ASF GitHub Bot Created on: 27/Feb/20 06:44 Start Date: 27/Feb/20 06:44 Worklog Time Spent: 10m Work Description: sunjincheng121 commented on issue #10945: [BEAM-9295] Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 URL: https://github.com/apache/beam/pull/10945#issuecomment-591810030 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393989) Time Spent: 2h (was: 1h 50m) > Add Flink 1.10 build target and Make FlinkRunner compatible with Flink 1.10 > --- > > Key: BEAM-9295 > URL: https://issues.apache.org/jira/browse/BEAM-9295 > Project: Beam > Issue Type: New Feature > Components: runner-flink >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Apache Flink 1.10 has completed the final release vote, see [1]. So, I would > like to add Flink 1.10 build target and make Flink Runner compatible with > Flink 1.10. > And I appreciate it if you can leave your suggestions or comments! > [1] > https://lists.apache.org/thread.html/r97672d4d1e47372cebf23e6643a6cc30a06bfbdf3f277b0be3695b15%40%3Cdev.flink.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393909=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393909 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:10 Start Date: 27/Feb/20 02:10 Worklog Time Spent: 10m Work Description: udim commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591741341 I believe this change may have broken precommits: https://builds.apache.org/job/beam_PreCommit_Python_Cron/2443/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393909) Time Spent: 1h 50m (was: 1h 40m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393910=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393910 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:10 Start Date: 27/Feb/20 02:10 Worklog Time Spent: 10m Work Description: udim commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591741341 This change may have broken precommits: https://builds.apache.org/job/beam_PreCommit_Python_Cron/2443/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393910) Time Spent: 2h (was: 1h 50m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393919=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393919 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:30 Start Date: 27/Feb/20 02:30 Worklog Time Spent: 10m Work Description: rohdesamuel commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591747015 > I see this error in the logs: > > 17:01:54 > assert event_tags.issubset(self.output_tags) > 17:01:54 E AssertionError: assert False > 17:01:54 E + where False = (set([None, '1'])) > 17:01:54 E + where = set(['a', 'b']).issubset > 17:01:54 E + and set([None, '1']) = .output_tags > > @rohdesamuel could you take a look? Yep, taking a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393919) Time Spent: 2.5h (was: 2h 20m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2.5h > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393918=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393918 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:30 Start Date: 27/Feb/20 02:30 Worklog Time Spent: 10m Work Description: aaltay commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591746886 I see this error in the logs: 17:01:54 > assert event_tags.issubset(self.output_tags) 17:01:54 E AssertionError: assert False 17:01:54 E + where False = (set([None, '1'])) 17:01:54 E +where = set(['a', 'b']).issubset 17:01:54 E +and set([None, '1']) = .output_tags @rohdesamuel could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393918) Time Spent: 2h 20m (was: 2h 10m) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 20m > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393860=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393860 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 00:04 Start Date: 27/Feb/20 00:04 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591707480 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393860) Time Spent: 1.5h (was: 1h 20m) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 1.5h > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8575) Add more Python validates runner tests
[ https://issues.apache.org/jira/browse/BEAM-8575?focusedWorklogId=393865=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393865 ] ASF GitHub Bot logged work on BEAM-8575: Author: ASF GitHub Bot Created on: 27/Feb/20 00:18 Start Date: 27/Feb/20 00:18 Worklog Time Spent: 10m Work Description: bumblebee-coming commented on pull request #10951: [BEAM-8575] Modified the test to work for different runners. URL: https://github.com/apache/beam/pull/10951#discussion_r384845501 ## File path: sdks/python/apache_beam/transforms/combiners_test.py ## @@ -470,6 +470,9 @@ def test_combining_with_accumulation_mode_and_fanout(self): ts.add_elements([i]) ts.advance_watermark_to_infinity() +def is_early_firing(element, num_partitions): + return 0 if element < 15 else 1 Review comment: PaneInfo is not supported yet in Python. https://issues.apache.org/jira/browse/BEAM-3759 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393865) Time Spent: 57h 10m (was: 57h) > Add more Python validates runner tests > -- > > Key: BEAM-8575 > URL: https://issues.apache.org/jira/browse/BEAM-8575 > Project: Beam > Issue Type: Test > Components: sdk-py-core, testing >Reporter: wendy liu >Assignee: wendy liu >Priority: Major > Time Spent: 57h 10m > Remaining Estimate: 0h > > This is the umbrella issue to track the work of adding more Python tests to > improve test coverage. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393881=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393881 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384851171 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ## @@ -261,14 +263,20 @@ public String registerCoder(Coder coder) throws IOException { * return the same unique ID. */ public String registerEnvironment(Environment env) { +String environmentId; String existing = environmentIds.get(env); if (existing != null) { - return existing; + environmentId = existing; +} else { + String name = uniqify(env.getUrn(), environmentIds.values()); + environmentIds.put(env, name); + componentsBuilder.putEnvironments(name, env); + environmentId = name; } -String name = uniqify(env.getUrn(), environmentIds.values()); -environmentIds.put(env, name); -componentsBuilder.putEnvironments(name, env); -return name; +if (defaultEnvironmentId == null) { Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393881) Time Spent: 3.5h (was: 3h 20m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3.5h > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-8975) Add Thrift Parser for ThriftIO
[ https://issues.apache.org/jira/browse/BEAM-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Larsen resolved BEAM-8975. Fix Version/s: Not applicable Resolution: Won't Do Doesn't need to be implemented as ThriftIO was redesigned to handle Thrift encoded files instead of Thrift IDL files. > Add Thrift Parser for ThriftIO > -- > > Key: BEAM-8975 > URL: https://issues.apache.org/jira/browse/BEAM-8975 > Project: Beam > Issue Type: New Feature > Components: io-java-files >Reporter: Chris Larsen >Assignee: Chris Larsen >Priority: Minor > Fix For: Not applicable > > Time Spent: 1.5h > Remaining Estimate: 0h > > This ticket is related to > [BEAM-8561|https://issues.apache.org/jira/projects/BEAM/issues/BEAM-8561?filter=allissues]. > As there are a large number of files to review for the > [PR|https://github.com/apache/beam/pull/10290] for ThriftIO this ticket will > serve as the tracker for the submission of a PR relating to the parser and > document model that will be used by ThriftIO. The aim is to reduce the number > of files submitted with each PR. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Closed] (BEAM-8975) Add Thrift Parser for ThriftIO
[ https://issues.apache.org/jira/browse/BEAM-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Larsen closed BEAM-8975. -- > Add Thrift Parser for ThriftIO > -- > > Key: BEAM-8975 > URL: https://issues.apache.org/jira/browse/BEAM-8975 > Project: Beam > Issue Type: New Feature > Components: io-java-files >Reporter: Chris Larsen >Assignee: Chris Larsen >Priority: Minor > Fix For: Not applicable > > Time Spent: 1.5h > Remaining Estimate: 0h > > This ticket is related to > [BEAM-8561|https://issues.apache.org/jira/projects/BEAM/issues/BEAM-8561?filter=allissues]. > As there are a large number of files to review for the > [PR|https://github.com/apache/beam/pull/10290] for ThriftIO this ticket will > serve as the tracker for the submission of a PR relating to the parser and > document model that will be used by ThriftIO. The aim is to reduce the number > of files submitted with each PR. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393890=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393890 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 01:32 Start Date: 27/Feb/20 01:32 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591731506 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393890) Time Spent: 1h 50m (was: 1h 40m) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 1h 50m > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9391) Cleanup hardcoded unified worker images
Ankur Goenka created BEAM-9391: -- Summary: Cleanup hardcoded unified worker images Key: BEAM-9391 URL: https://issues.apache.org/jira/browse/BEAM-9391 Project: Beam Issue Type: Bug Components: sdk-py-harness, testing Reporter: Ankur Goenka Assignee: Ankur Goenka -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8487) Python typehints: support forward references
[ https://issues.apache.org/jira/browse/BEAM-8487?focusedWorklogId=393905=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393905 ] ASF GitHub Bot logged work on BEAM-8487: Author: ASF GitHub Bot Created on: 27/Feb/20 01:59 Start Date: 27/Feb/20 01:59 Worklog Time Spent: 10m Work Description: udim commented on pull request #10932: [BEAM-8487] Handle nested forward references URL: https://github.com/apache/beam/pull/10932#discussion_r384873795 ## File path: sdks/python/apache_beam/typehints/native_type_compatibility.py ## @@ -163,8 +163,14 @@ def is_any(typ): return typ is typing.Any +try: + _ForwardRef = typing.ForwardRef +except AttributeError: Review comment: done, also rebased This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393905) Time Spent: 1h 40m (was: 1.5h) > Python typehints: support forward references > > > Key: BEAM-8487 > URL: https://issues.apache.org/jira/browse/BEAM-8487 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Typehints may be given as string literals: > https://www.python.org/dev/peps/pep-0484/#forward-references > These are currently not evaluated and result in errors. > Example 1: > {code} > def test_typed_callable_string_hints(self): > def do_fn(element: 'int') -> 'typehints.List[str]': > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(do_fn) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > This results in: > {code} > > return issubclass(sub, base) > E TypeError: issubclass() arg 2 must be a class or tuple of classes > typehints.py:1168: TypeError > {code} > Example 2: > {code} > def test_typed_dofn_string_hints(self): > class MyDoFn(beam.DoFn): > def process(self, element: 'int') -> 'typehints.List[str]': > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(MyDoFn()) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > This results in: > {code} > > raise ValueError('%s is not iterable' % type_hint) > E ValueError: typehints.List[str] is not iterable > typehints.py:1194: ValueError > {code} > where the non-iterable entity the error refers to is a string literal > ("typehints.List[str]"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8965) WriteToBigQuery failed in BundleBasedDirectRunner
[ https://issues.apache.org/jira/browse/BEAM-8965?focusedWorklogId=393843=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393843 ] ASF GitHub Bot logged work on BEAM-8965: Author: ASF GitHub Bot Created on: 26/Feb/20 23:36 Start Date: 26/Feb/20 23:36 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10901: [BEAM-8965] Remove duplicate sideinputs in ConsumerTrackingPipelineVisitor URL: https://github.com/apache/beam/pull/10901#issuecomment-591699695 thanks @bobingm This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393843) Time Spent: 2.5h (was: 2h 20m) > WriteToBigQuery failed in BundleBasedDirectRunner > - > > Key: BEAM-8965 > URL: https://issues.apache.org/jira/browse/BEAM-8965 > Project: Beam > Issue Type: Bug > Components: io-py-gcp >Affects Versions: 2.16.0, 2.17.0, 2.18.0, 2.19.0 >Reporter: Wenbing Bai >Assignee: Wenbing Bai >Priority: Major > Time Spent: 2.5h > Remaining Estimate: 0h > > *{{WriteToBigQuery}}* fails in *{{BundleBasedDirectRunner}}* with error > {{PCollection of size 2 with more than one element accessed as a singleton > view.}} > Here is the code > > {code:python} > with Pipeline() as p: > query_results = ( > p > | beam.io.Read(beam.io.BigQuerySource( > query='SELECT ... FROM ...') > ) > query_results | beam.io.gcp.WriteToBigQuery( > table=, > method=WriteToBigQuery.Method.FILE_LOADS, > schema={"fields": []} > ) > {code} > > Here is the error > > {code:none} > File "apache_beam/runners/common.py", line 778, in > apache_beam.runners.common.DoFnRunner.process > def process(self, windowed_value): > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > self._reraise_augmented(exn) > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > raise_with_traceback(new_exn) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > return self.do_fn_invoker.invoke_process(windowed_value) > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > self._invoke_process_per_window( > File "apache_beam/runners/common.py", line 610, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > [si[global_window] for si in self.side_inputs])) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/transforms/sideinputs.py", > line 65, in __getitem__ > _FilteringIterable(self._iterable, target_window), self._view_options) > File > "/home/wbai/terra/terra_py2/local/lib/python2.7/site-packages/apache_beam/pvalue.py", > line 443, in _from_runtime_iterable > len(head), str(head[0]), str(head[1]))) > ValueError: PCollection of size 2 with more than one element accessed as a > singleton view. First two elements encountered are > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f", > "gs://temp-dev/temp/bq_load/3edbf2172dd540edb5c8e9597206b10f". [while running > 'WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)'] > {code} > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393868=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393868 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 00:27 Start Date: 27/Feb/20 00:27 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384840768 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -185,6 +200,37 @@ private Unbounded(@Nullable String name, UnboundedSource source) { @Override public final PCollection expand(PBegin input) { source.validate(); + + if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") + && !ExperimentalOptions.hasExperiment( + input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) { +// We don't use Create here since Create is defined as a BoundedSource and using it would +// cause an infinite expansion loop. We can reconsider this if Create is implemented +// directly as a SplittableDoFn. +PCollection> outputWithIds = +input +.getPipeline() +.apply(Impulse.create()) +.apply( +MapElements.into(new TypeDescriptor>() {}) +.via(element -> (UnboundedSource) source)) +.setCoder( +SerializableCoder.of( +new TypeDescriptor>() {})) +.apply( +ParDo.of( +new UnboundedSourceAsSDFWrapperFn<>( +(Coder) source.getCheckpointMarkCoder( +.setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder())); +if (source.requiresDeduping()) { + outputWithIds.apply( + Distinct., byte[]>withRepresentativeValueFn( Review comment: I'm curious what `WindowingStrategy` is going to apply here. If it's using `GlobalWindow`, will it work in streaming mode? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393868) Time Spent: 21h (was: 20h 50m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21h > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393869=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393869 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 00:27 Start Date: 27/Feb/20 00:27 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384846699 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; +private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; +private final Coder restrictionCoder; + +private UnboundedSourceAsSDFWrapperFn(Coder restrictionCoder) { + this.restrictionCoder = restrictionCoder; +} + +@GetInitialRestriction +public KV, CheckpointT> initialRestriction( +@Element UnboundedSource element) { + return KV.of(element, null); +} + +@GetSize +public double getSize( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) +throws Exception { + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return 1; + } + + UnboundedReader reader = + restriction.getKey().createReader(pipelineOptions, restriction.getValue()); + long size = reader.getSplitBacklogBytes(); + if (size != UnboundedReader.BACKLOG_UNKNOWN) { +return size; + } + // TODO: Support "global" backlog reporting + // size = reader.getTotalBacklogBytes(); + // if (size != UnboundedReader.BACKLOG_UNKNOWN) { + // return size; + // } + return 1; +} + +@SplitRestriction +public void splitRestriction( +@Restriction KV, CheckpointT> restriction, +OutputReceiver, CheckpointT>> receiver, +PipelineOptions pipelineOptions) +throws Exception { + // The empty unbounded source is trivially done and hence we don't need to output any splits + // for it. + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return; + } + + // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has + // been created. + if (restriction.getValue() != null + && !(restriction.getValue() + instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) { +receiver.output(restriction); + } + + try { +for (UnboundedSource split : +restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { + receiver.output(KV.of(split, null)); +} + } catch (Exception e) { +receiver.output(restriction); + } +} + +@NewTracker +public RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +restrictionTracker( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) { + return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); +} + +@ProcessElement +public ProcessContinuation processElement( +RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +tracker, +OutputReceiver> receiver, +BundleFinalizer bundleFinalizer) +throws IOException { + UnboundedSourceValue[] out = new UnboundedSourceValue[1]; + while (tracker.tryClaim(out)) { +receiver.outputWithTimestamp( +new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); + } + + // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial. + KV, CheckpointT> currentRestriction = + tracker.currentRestriction(); + if (currentRestriction.getValue() != null + && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) { +bundleFinalizer.afterBundleCommit( +
[jira] [Work logged] (BEAM-8618) Tear down unused DoFns periodically in Python SDK harness
[ https://issues.apache.org/jira/browse/BEAM-8618?focusedWorklogId=393870=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393870 ] ASF GitHub Bot logged work on BEAM-8618: Author: ASF GitHub Bot Created on: 27/Feb/20 00:28 Start Date: 27/Feb/20 00:28 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10655: [BEAM-8618] Tear down unused DoFns periodically in Python SDK harness. URL: https://github.com/apache/beam/pull/10655 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393870) Time Spent: 5.5h (was: 5h 20m) > Tear down unused DoFns periodically in Python SDK harness > - > > Key: BEAM-8618 > URL: https://issues.apache.org/jira/browse/BEAM-8618 > Project: Beam > Issue Type: Improvement > Components: sdk-py-harness >Reporter: sunjincheng >Assignee: sunjincheng >Priority: Major > Time Spent: 5.5h > Remaining Estimate: 0h > > Per the discussion in the ML, detail can be found [1], the teardown of DoFns > should be supported in the portability framework. It happens at two places: > 1) Upon the control service termination > 2) Tear down the unused DoFns periodically > The aim of this JIRA is to add support for tear down the unused DoFns > periodically in Python SDK harness. > [1] > https://lists.apache.org/thread.html/0c4a4cf83cf2e35c3dfeb9d906e26cd82d3820968ba6f862f91739e4@%3Cdev.beam.apache.org%3E -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046021#comment-17046021 ] Valentyn Tymofieiev edited comment on BEAM-8494 at 2/27/20 1:02 AM: Looks like we'd still have to chase down more failing tests. -- Ran 3172 tests in 863.510s FAILED (SKIP=309, errors=31, failures=1) was (Author: tvalentyn): Looks like we'd have to chase down some failing tests. -- Ran 3172 tests in 863.510s FAILED (SKIP=309, errors=31, failures=1) > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046018#comment-17046018 ] Valentyn Tymofieiev edited comment on BEAM-8494 at 2/27/20 1:02 AM: I tried to install Beam on Py 3.8 and run unit tests. Findings so far: - We need to relax some dependencies (fastavro, pyarrow, pandas), otherwise wheels fail to build on my platform. - For some reason we skip typing_extensions on Python 3.8 [1], and that causes a large number of tests to fail, which otherwise pass if we install this dependency. [1] https://github.com/apache/beam/blob/4a25aa0dcaf19184ac279f566917132f5ae2be9d/sdks/python/setup.py#L172 was (Author: tvalentyn): I tried to install Beam on Py 3.8 and run unit tests. Findings so far: - We need to relax some dependencies (fastavro, pyarrow, pandas), otherwise wheels fail to build on my platform. - It seems like we need typing_extensions for tests to pass. For some reason we skip it on Python 3.8 [1]. [1] https://github.com/apache/beam/blob/4a25aa0dcaf19184ac279f566917132f5ae2be9d/sdks/python/setup.py#L172 > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046021#comment-17046021 ] Valentyn Tymofieiev commented on BEAM-8494: --- Looks like we'd have to chase down some failing tests. -- Ran 3172 tests in 863.510s FAILED (SKIP=309, errors=31, failures=1) > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8487) Python typehints: support forward references
[ https://issues.apache.org/jira/browse/BEAM-8487?focusedWorklogId=393907=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393907 ] ASF GitHub Bot logged work on BEAM-8487: Author: ASF GitHub Bot Created on: 27/Feb/20 02:04 Start Date: 27/Feb/20 02:04 Worklog Time Spent: 10m Work Description: tvalentyn commented on issue #10932: [BEAM-8487] Handle nested forward references URL: https://github.com/apache/beam/pull/10932#issuecomment-591739717 LGTM, thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393907) Time Spent: 1h 50m (was: 1h 40m) > Python typehints: support forward references > > > Key: BEAM-8487 > URL: https://issues.apache.org/jira/browse/BEAM-8487 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Udi Meiri >Assignee: Udi Meiri >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > Typehints may be given as string literals: > https://www.python.org/dev/peps/pep-0484/#forward-references > These are currently not evaluated and result in errors. > Example 1: > {code} > def test_typed_callable_string_hints(self): > def do_fn(element: 'int') -> 'typehints.List[str]': > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(do_fn) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > This results in: > {code} > > return issubclass(sub, base) > E TypeError: issubclass() arg 2 must be a class or tuple of classes > typehints.py:1168: TypeError > {code} > Example 2: > {code} > def test_typed_dofn_string_hints(self): > class MyDoFn(beam.DoFn): > def process(self, element: 'int') -> 'typehints.List[str]': > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(MyDoFn()) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > This results in: > {code} > > raise ValueError('%s is not iterable' % type_hint) > E ValueError: typehints.List[str] is not iterable > typehints.py:1194: ValueError > {code} > where the non-iterable entity the error refers to is a string literal > ("typehints.List[str]"). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393916=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393916 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 27/Feb/20 02:25 Start Date: 27/Feb/20 02:25 Worklog Time Spent: 10m Work Description: udim commented on issue #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#issuecomment-591745730 LGTM, tests already failing here: https://builds.apache.org/job/beam_PreCommit_Python_Cron/2443/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393916) Time Spent: 67h 50m (was: 67h 40m) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67h 50m > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393852=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393852 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 26/Feb/20 23:48 Start Date: 26/Feb/20 23:48 Worklog Time Spent: 10m Work Description: chadrik commented on pull request #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#discussion_r384836306 ## File path: sdks/python/apache_beam/transforms/external_java.py ## @@ -37,18 +39,19 @@ # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position +apiclient = None # type: Optional[types.ModuleType] Review comment: I did some more research on this, and I found this mypy issue: https://github.com/python/mypy/issues/1297 It suggests this idiom: ```python try: from apache_beam.runners.dataflow.internal import apiclient as _apiclient except ImportError: apiclient = None else: apiclient = _apiclient ``` The import is a bit longer and uglier, but it has 2 advantages: - no need to import `Optional` or `ModuleType` - the idiom I was using was actually making `apiclient` a generic ModuleType, dropping all knowledge of the members of `apache_beam.runners.dataflow.internal`. That's bad! The reason this works without explicit `Optional` annotation that mypy will automatically determine optionality in some cases, like this: ```python if some_conditional(): x = None else: x = 1 reveal_type(x) # Revealed type is 'Union[builtins.int, None]' ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393852) Time Spent: 67h 20m (was: 67h 10m) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67h 20m > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393873=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393873 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 27/Feb/20 00:37 Start Date: 27/Feb/20 00:37 Worklog Time Spent: 10m Work Description: chadrik commented on issue #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#issuecomment-591716771 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393873) Time Spent: 67.5h (was: 67h 20m) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67.5h > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8494) Python 3.8 Support
[ https://issues.apache.org/jira/browse/BEAM-8494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046018#comment-17046018 ] Valentyn Tymofieiev commented on BEAM-8494: --- I tried to install Beam on Py 3.8 and run unit tests. Findings so far: - We need to relax some dependencies (fastavro, pyarrow, pandas), otherwise wheels fail to build on my platform. - It seems like we need typing_extensions for tests to pass. For some reason we skip it on Python 3.8 [1]. [1] https://github.com/apache/beam/blob/4a25aa0dcaf19184ac279f566917132f5ae2be9d/sdks/python/setup.py#L172 > Python 3.8 Support > -- > > Key: BEAM-8494 > URL: https://issues.apache.org/jira/browse/BEAM-8494 > Project: Beam > Issue Type: Improvement > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393897=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393897 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:40 Start Date: 27/Feb/20 01:40 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384868736 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; +private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; +private final Coder restrictionCoder; + +private UnboundedSourceAsSDFWrapperFn(Coder restrictionCoder) { + this.restrictionCoder = restrictionCoder; +} + +@GetInitialRestriction +public KV, CheckpointT> initialRestriction( +@Element UnboundedSource element) { + return KV.of(element, null); +} + +@GetSize +public double getSize( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) +throws Exception { + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return 1; + } + + UnboundedReader reader = + restriction.getKey().createReader(pipelineOptions, restriction.getValue()); + long size = reader.getSplitBacklogBytes(); + if (size != UnboundedReader.BACKLOG_UNKNOWN) { +return size; + } + // TODO: Support "global" backlog reporting + // size = reader.getTotalBacklogBytes(); + // if (size != UnboundedReader.BACKLOG_UNKNOWN) { + // return size; + // } + return 1; +} + +@SplitRestriction +public void splitRestriction( +@Restriction KV, CheckpointT> restriction, +OutputReceiver, CheckpointT>> receiver, +PipelineOptions pipelineOptions) +throws Exception { + // The empty unbounded source is trivially done and hence we don't need to output any splits + // for it. + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return; + } + + // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has + // been created. + if (restriction.getValue() != null + && !(restriction.getValue() + instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) { +receiver.output(restriction); + } + + try { +for (UnboundedSource split : +restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { + receiver.output(KV.of(split, null)); +} + } catch (Exception e) { +receiver.output(restriction); + } +} + +@NewTracker +public RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +restrictionTracker( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) { + return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); +} + +@ProcessElement +public ProcessContinuation processElement( +RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +tracker, +OutputReceiver> receiver, +BundleFinalizer bundleFinalizer) +throws IOException { + UnboundedSourceValue[] out = new UnboundedSourceValue[1]; + while (tracker.tryClaim(out)) { +receiver.outputWithTimestamp( +new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); + } + + // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial. + KV, CheckpointT> currentRestriction = + tracker.currentRestriction(); + if (currentRestriction.getValue() != null + && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) { +bundleFinalizer.afterBundleCommit( +
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393896=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393896 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:40 Start Date: 27/Feb/20 01:40 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384867760 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. Review comment: Will file JIRA if I can't do all the watermark reporting passing and implementation in the SDK harness. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393896) Time Spent: 21.5h (was: 21h 20m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21.5h > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393899=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393899 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:41 Start Date: 27/Feb/20 01:41 Worklog Time Spent: 10m Work Description: lukecwik commented on issue #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#issuecomment-591733789 > Looks good. I'd like to see at least one smoke test. There are validates runner unbounded source tests which will all be converted over. Some of them are failing right now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393899) Time Spent: 21h 40m (was: 21.5h) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21h 40m > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393898=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393898 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:40 Start Date: 27/Feb/20 01:40 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384868638 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; Review comment: The default in Dataflow is `4 * first non-null of (maxNumWorkers, numWorkers, 5)`: https://github.com/apache/beam/blob/860131b5d47c830b772d3f1665a26a45ec85ab36/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/CustomSources.java#L51 (I like the TODO in the method) Which is why I went with 20 as the default. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393898) Time Spent: 21.5h (was: 21h 20m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21.5h > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393895=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393895 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:40 Start Date: 27/Feb/20 01:40 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384867042 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -185,6 +200,37 @@ private Unbounded(@Nullable String name, UnboundedSource source) { @Override public final PCollection expand(PBegin input) { source.validate(); + + if (ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") + && !ExperimentalOptions.hasExperiment( + input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) { +// We don't use Create here since Create is defined as a BoundedSource and using it would +// cause an infinite expansion loop. We can reconsider this if Create is implemented +// directly as a SplittableDoFn. +PCollection> outputWithIds = +input +.getPipeline() +.apply(Impulse.create()) +.apply( +MapElements.into(new TypeDescriptor>() {}) +.via(element -> (UnboundedSource) source)) +.setCoder( +SerializableCoder.of( +new TypeDescriptor>() {})) +.apply( +ParDo.of( +new UnboundedSourceAsSDFWrapperFn<>( +(Coder) source.getCheckpointMarkCoder( +.setCoder(ValueWithRecordIdCoder.of(source.getOutputCoder())); +if (source.requiresDeduping()) { + outputWithIds.apply( + Distinct., byte[]>withRepresentativeValueFn( Review comment: No it won't, this was for some testing of mine. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393895) Time Spent: 21h 20m (was: 21h 10m) > Fn API SDF support > -- > > Key: BEAM-2939 > URL: https://issues.apache.org/jira/browse/BEAM-2939 > Project: Beam > Issue Type: Improvement > Components: beam-model >Reporter: Henning Rohde >Assignee: Luke Cwik >Priority: Major > Labels: portability > Time Spent: 21h 20m > Remaining Estimate: 0h > > The Fn API should support streaming SDF. Detailed design TBD. > Once design is ready, expand subtasks similarly to BEAM-2822. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-8599) Establish consensus around how many concurrent minor versions of Python Beam should support, and deprecation policy for older versions.
[ https://issues.apache.org/jira/browse/BEAM-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17046055#comment-17046055 ] Valentyn Tymofieiev commented on BEAM-8599: --- This is being discussed at: [1] https://lists.apache.org/thread.html/rd070afcebff5c967ec3b25d1f7a77db5278992c1508082bf5f636acd%40%3Cdev.beam.apache.org%3E > Establish consensus around how many concurrent minor versions of Python Beam > should support, and deprecation policy for older versions. > > > Key: BEAM-8599 > URL: https://issues.apache.org/jira/browse/BEAM-8599 > Project: Beam > Issue Type: Sub-task > Components: sdk-py-core >Reporter: Valentyn Tymofieiev >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9381) Add display data to BoundedSourceSDF
[ https://issues.apache.org/jira/browse/BEAM-9381?focusedWorklogId=393859=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393859 ] ASF GitHub Bot logged work on BEAM-9381: Author: ASF GitHub Bot Created on: 27/Feb/20 00:02 Start Date: 27/Feb/20 00:02 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10968: [BEAM-9381] Adding display data to BoundedSource SDF URL: https://github.com/apache/beam/pull/10968#issuecomment-591706858 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393859) Time Spent: 1h 20m (was: 1h 10m) > Add display data to BoundedSourceSDF > - > > Key: BEAM-9381 > URL: https://issues.apache.org/jira/browse/BEAM-9381 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Pablo Estrada >Assignee: Pablo Estrada >Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393877=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393877 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384853321 ## File path: sdks/python/apache_beam/runners/portability/stager.py ## @@ -547,21 +601,22 @@ def _desired_sdk_filename_in_staging_location(sdk_location): else: return DATAFLOW_SDK_TARBALL_FILE - def _stage_beam_sdk(self, sdk_remote_location, staging_location, temp_dir): -# type: (...) -> List[str] + @staticmethod + def _stage_beam_sdk(sdk_remote_location, temp_dir): Review comment: same This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393877) Time Spent: 3h 20m (was: 3h 10m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393880=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393880 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384851813 ## File path: runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableRunner.java ## @@ -200,11 +143,44 @@ public PipelineResult run(Pipeline pipeline) { prepareJobResponse.getArtifactStagingEndpoint(); String stagingSessionToken = prepareJobResponse.getStagingSessionToken(); + ImmutableList.Builder filesToStageBuilder = ImmutableList.builder(); Review comment: Perhaps it's worth pulling this out into a separate method? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393880) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393878=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393878 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384849942 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java ## @@ -175,6 +197,90 @@ public static Environment createProcessEnvironment( } } + public static Collection getArtifacts(PipelineOptions options) { +Set pathsToStage = Sets.newHashSet(); +// TODO(heejong): remove jar_packages experimental flag when cross-language dependency +// management is implemented for all runners. +List experiments = options.as(ExperimentalOptions.class).getExperiments(); +if (experiments != null) { + Optional jarPackages = + experiments.stream() + .filter((String flag) -> flag.startsWith("jar_packages=")) + .findFirst(); + jarPackages.ifPresent( + s -> pathsToStage.addAll(Arrays.asList(s.replaceFirst("jar_packages=", "").split(","; +} +List stagingFiles = options.as(PortablePipelineOptions.class).getFilesToStage(); +if (stagingFiles == null) { + pathsToStage.addAll( + detectClassPathResourcesToStage(Environments.class.getClassLoader(), options)); + if (pathsToStage.isEmpty()) { +throw new IllegalArgumentException("No classpath elements found."); + } + LOG.debug( + "PortablePipelineOptions.filesToStage was not specified. " + + "Defaulting to files from the classpath: {}", + pathsToStage.size()); +} else { + pathsToStage.addAll(stagingFiles); +} + +ImmutableList.Builder filesToStage = ImmutableList.builder(); +for (String path : pathsToStage) { + File file = new File(path); + if (new File(path).exists()) { +// Spurious items get added to the classpath. Filter by just those that exist. +if (file.isDirectory()) { + // Zip up directories so we can upload them to the artifact service. Review comment: Looks like this was the previous behavior. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393878) Time Spent: 3h 20m (was: 3h 10m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393879=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393879 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384853099 ## File path: sdks/python/apache_beam/runners/portability/stager.py ## @@ -377,34 +436,33 @@ def _stage_jar_packages(self, jar_packages, staging_location, temp_dir): for package in local_packages: basename = os.path.basename(package) - staged_path = FileSystems.join(staging_location, basename) - self.stage_artifact(package, staged_path) - resources.append(basename) + resources.append((package, basename)) return resources - def _stage_extra_packages(self, extra_packages, staging_location, temp_dir): -# type: (...) -> List[str] + @staticmethod + def _stage_extra_packages(extra_packages, temp_dir): Review comment: Similarly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393879) Time Spent: 3h 20m (was: 3h 10m) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393875=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393875 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384853067 ## File path: sdks/python/apache_beam/runners/portability/stager.py ## @@ -331,22 +389,23 @@ def _download_file(from_url, to_path): def _is_remote_path(path): return path.find('://') != -1 - def _stage_jar_packages(self, jar_packages, staging_location, temp_dir): -# type: (...) -> List[str] + @staticmethod + def _stage_jar_packages(jar_packages, temp_dir): Review comment: Does this actually stage them, or just add them to the returned list? If not, the name and docstring should be updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393875) Time Spent: 3h 10m (was: 3h) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9056) Staging artifacts from environment
[ https://issues.apache.org/jira/browse/BEAM-9056?focusedWorklogId=393876=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393876 ] ASF GitHub Bot logged work on BEAM-9056: Author: ASF GitHub Bot Created on: 27/Feb/20 00:47 Start Date: 27/Feb/20 00:47 Worklog Time Spent: 10m Work Description: robertwb commented on pull request #10621: [BEAM-9056] Staging artifacts from environment URL: https://github.com/apache/beam/pull/10621#discussion_r384850669 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ## @@ -88,11 +88,13 @@ public static SdkComponents create(RunnerApi.Components components) { public static SdkComponents create(PipelineOptions options) { SdkComponents sdkComponents = new SdkComponents(RunnerApi.Components.getDefaultInstance(), ""); PortablePipelineOptions portablePipelineOptions = options.as(PortablePipelineOptions.class); -sdkComponents.defaultEnvironmentId = -sdkComponents.registerEnvironment( -Environments.createOrGetDefaultEnvironment( +sdkComponents.registerEnvironment( +Environments.createOrGetDefaultEnvironment( portablePipelineOptions.getDefaultEnvironmentType(), -portablePipelineOptions.getDefaultEnvironmentConfig())); +portablePipelineOptions.getDefaultEnvironmentConfig()) +.toBuilder() +.addAllDependencies(Environments.getArtifacts(options)) Review comment: Would this be better placed in createOrGetDefaultEnvironment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393876) Time Spent: 3h 10m (was: 3h) > Staging artifacts from environment > -- > > Key: BEAM-9056 > URL: https://issues.apache.org/jira/browse/BEAM-9056 > Project: Beam > Issue Type: Sub-task > Components: java-fn-execution >Reporter: Heejong Lee >Assignee: Heejong Lee >Priority: Major > Time Spent: 3h 10m > Remaining Estimate: 0h > > staging artifacts from artifact information embedded in environment proto. > detail: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-2939) Fn API SDF support
[ https://issues.apache.org/jira/browse/BEAM-2939?focusedWorklogId=393889=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393889 ] ASF GitHub Bot logged work on BEAM-2939: Author: ASF GitHub Bot Created on: 27/Feb/20 01:30 Start Date: 27/Feb/20 01:30 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #10897: [BEAM-2939] Java UnboundedSource SDF wrapper URL: https://github.com/apache/beam/pull/10897#discussion_r384866125 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java ## @@ -375,4 +422,361 @@ public void checkDone() throws IllegalStateException { } } } + + /** + * A splittable {@link DoFn} which executes an {@link UnboundedSource}. + * + * We model the element as the original source and the restriction as a pair of the sub-source + * and its {@link CheckpointMark}. This allows us to split the sub-source over and over as long as + * the checkpoint mark is {@code null} or the {@link NoopCheckpointMark} since it does not + * maintain any state. + */ + // TODO: Support reporting the watermark, currently the watermark never advances. + @UnboundedPerElement + static class UnboundedSourceAsSDFWrapperFn + extends DoFn, ValueWithRecordId> { + +private static final int DEFAULT_DESIRED_NUM_SPLITS = 20; +private static final int DEFAULT_BUNDLE_FINALIZATION_LIMIT_MINS = 10; +private final Coder restrictionCoder; + +private UnboundedSourceAsSDFWrapperFn(Coder restrictionCoder) { + this.restrictionCoder = restrictionCoder; +} + +@GetInitialRestriction +public KV, CheckpointT> initialRestriction( +@Element UnboundedSource element) { + return KV.of(element, null); +} + +@GetSize +public double getSize( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) +throws Exception { + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return 1; + } + + UnboundedReader reader = + restriction.getKey().createReader(pipelineOptions, restriction.getValue()); + long size = reader.getSplitBacklogBytes(); + if (size != UnboundedReader.BACKLOG_UNKNOWN) { +return size; + } + // TODO: Support "global" backlog reporting + // size = reader.getTotalBacklogBytes(); + // if (size != UnboundedReader.BACKLOG_UNKNOWN) { + // return size; + // } + return 1; +} + +@SplitRestriction +public void splitRestriction( +@Restriction KV, CheckpointT> restriction, +OutputReceiver, CheckpointT>> receiver, +PipelineOptions pipelineOptions) +throws Exception { + // The empty unbounded source is trivially done and hence we don't need to output any splits + // for it. + if (restriction.getKey() instanceof EmptyUnboundedSource) { +return; + } + + // The UnboundedSource API does not support splitting after a meaningful checkpoint mark has + // been created. + if (restriction.getValue() != null + && !(restriction.getValue() + instanceof UnboundedSource.CheckpointMark.NoopCheckpointMark)) { +receiver.output(restriction); + } + + try { +for (UnboundedSource split : +restriction.getKey().split(DEFAULT_DESIRED_NUM_SPLITS, pipelineOptions)) { + receiver.output(KV.of(split, null)); +} + } catch (Exception e) { +receiver.output(restriction); + } +} + +@NewTracker +public RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +restrictionTracker( +@Restriction KV, CheckpointT> restriction, +PipelineOptions pipelineOptions) { + return new UnboundedSourceAsSDFRestrictionTracker(restriction, pipelineOptions); +} + +@ProcessElement +public ProcessContinuation processElement( +RestrictionTracker< +KV, CheckpointT>, UnboundedSourceValue[]> +tracker, +OutputReceiver> receiver, +BundleFinalizer bundleFinalizer) +throws IOException { + UnboundedSourceValue[] out = new UnboundedSourceValue[1]; + while (tracker.tryClaim(out)) { +receiver.outputWithTimestamp( +new ValueWithRecordId<>(out[0].getValue(), out[0].getId()), out[0].getTimestamp()); + } + + // Add the checkpoint mark to be finalized if the checkpoint mark isn't trivial. + KV, CheckpointT> currentRestriction = + tracker.currentRestriction(); + if (currentRestriction.getValue() != null + && !(tracker.currentRestriction().getValue() instanceof NoopCheckpointMark)) { +bundleFinalizer.afterBundleCommit( +
[jira] [Work logged] (BEAM-7746) Add type hints to python code
[ https://issues.apache.org/jira/browse/BEAM-7746?focusedWorklogId=393903=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393903 ] ASF GitHub Bot logged work on BEAM-7746: Author: ASF GitHub Bot Created on: 27/Feb/20 01:48 Start Date: 27/Feb/20 01:48 Worklog Time Spent: 10m Work Description: chadrik commented on issue #10822: [BEAM-7746] Minor typing updates / fixes URL: https://github.com/apache/beam/pull/10822#issuecomment-591735657 @robertwb I don't think the test failures are my fault because they were passing before I rebased onto master... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393903) Time Spent: 67h 40m (was: 67.5h) > Add type hints to python code > - > > Key: BEAM-7746 > URL: https://issues.apache.org/jira/browse/BEAM-7746 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chad Dombrova >Assignee: Chad Dombrova >Priority: Major > Time Spent: 67h 40m > Remaining Estimate: 0h > > As a developer of the beam source code, I would like the code to use pep484 > type hints so that I can clearly see what types are required, get completion > in my IDE, and enforce code correctness via a static analyzer like mypy. > This may be considered a precursor to BEAM-7060 > Work has been started here: [https://github.com/apache/beam/pull/9056] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8841) Add ability to perform BigQuery file loads using avro
[ https://issues.apache.org/jira/browse/BEAM-8841?focusedWorklogId=393904=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393904 ] ASF GitHub Bot logged work on BEAM-8841: Author: ASF GitHub Bot Created on: 27/Feb/20 01:48 Start Date: 27/Feb/20 01:48 Worklog Time Spent: 10m Work Description: pabloem commented on issue #10979: [BEAM-8841] Support writing data to BigQuery via Avro in Python SDK URL: https://github.com/apache/beam/pull/10979#issuecomment-591735822 Run Python 3.5 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393904) Time Spent: 2h 10m (was: 2h) > Add ability to perform BigQuery file loads using avro > - > > Key: BEAM-8841 > URL: https://issues.apache.org/jira/browse/BEAM-8841 > Project: Beam > Issue Type: Improvement > Components: io-py-gcp >Reporter: Chun Yang >Assignee: Chun Yang >Priority: Minor > Time Spent: 2h 10m > Remaining Estimate: 0h > > Currently, JSON format is used for file loads into BigQuery in the Python > SDK. JSON has some disadvantages including size of serialized data and > inability to represent NaN and infinity float values. > BigQuery supports loading files in avro format, which can overcome these > disadvantages. The Java SDK already supports loading files using avro format > (BEAM-2879) so it makes sense to support it in the Python SDK as well. > The change will be somewhere around > [{{BigQueryBatchFileLoads}}|https://github.com/apache/beam/blob/3e7865ee6c6a56e51199515ec5b4b16de1ddd166/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py#L554]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9322) Python SDK ignores manually set PCollection tags
[ https://issues.apache.org/jira/browse/BEAM-9322?focusedWorklogId=393914=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-393914 ] ASF GitHub Bot logged work on BEAM-9322: Author: ASF GitHub Bot Created on: 27/Feb/20 02:21 Start Date: 27/Feb/20 02:21 Worklog Time Spent: 10m Work Description: udim commented on issue #10934: [BEAM-9322] [BEAM-1833] Broke some people, setting the default to have the experiment be disabled URL: https://github.com/apache/beam/pull/10934#issuecomment-591741341 This change may have broken precommits: https://builds.apache.org/job/beam_PreCommit_Python_Cron/2443/ edit: actually, not quite sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 393914) Time Spent: 2h 10m (was: 2h) > Python SDK ignores manually set PCollection tags > > > Key: BEAM-9322 > URL: https://issues.apache.org/jira/browse/BEAM-9322 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Sam Rohde >Assignee: Sam Rohde >Priority: Major > Fix For: 2.20.0 > > Time Spent: 2h 10m > Remaining Estimate: 0h > > The Python SDK currently ignores any tags set on PCollections manually when > applying PTransforms when adding the PCollection to the PTransform > [outputs|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L595]]. > In the > [add_output|[https://github.com/apache/beam/blob/688a4ea53f315ec2aa2d37602fd78496fca8bb4f/sdks/python/apache_beam/pipeline.py#L872]] > method, the tag is set to None for all PValues, meaning the output tags are > set to an enumeration index over the PCollection outputs. The tags are not > propagated to correctly which can be a problem on relying on the output > PCollection tags to match the user set values. > The fix is to correct BEAM-1833, and always pass in the tags. However, that > doesn't fix the problem for nested PCollections. If you have a dict of lists > of PCollections, what should their tags be correctly set to? In order to fix > this, first propagate the correct tag then talk with the community about the > best auto-generated tags. > Some users may rely on the old implementation, so a flag will be created: > "force_generated_pcollection_output_ids" and be default set to False. If > True, this will go to the old implementation and generate tags for > PCollections. -- This message was sent by Atlassian Jira (v8.3.4#803005)