[GitHub] [beam] tvalentyn commented on pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.
tvalentyn commented on pull request #13219: URL: https://github.com/apache/beam/pull/13219#issuecomment-718394925 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn removed a comment on pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.
tvalentyn removed a comment on pull request #13219: URL: https://github.com/apache/beam/pull/13219#issuecomment-718394306 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.
tvalentyn commented on pull request #13219: URL: https://github.com/apache/beam/pull/13219#issuecomment-718394558 Run Python Dataflow ValidatesContainer This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.
tvalentyn commented on pull request #13219: URL: https://github.com/apache/beam/pull/13219#issuecomment-718394306 Run Python Dataflow ValidatesContainer This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn opened a new pull request #13219: Use a 'slim' flavor for Python images to reduce the image size.
tvalentyn opened a new pull request #13219: URL: https://github.com/apache/beam/pull/13219 We could reduce the size of Python container images by switching to slim version of the base image. We have been building Dataflow Python containers from 'slim' versions for a while and haven't encountered any issues. A slim version shaves 750 MB of unpacked image size as per `docker images`: ``` python 3.7-slim-buster 217e853914498 days ago 112MB python 3.7-buster b0dee8d708b98 days ago 876MB `` Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](
[GitHub] [beam] youngoli merged pull request #13188: [BEAM-11108] Add a version of TextIO implemented via SDF.
youngoli merged pull request #13188: URL: https://github.com/apache/beam/pull/13188 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli commented on a change in pull request #13209: [BEAM-9615] Add schema coders and tests.
youngoli commented on a change in pull request #13209: URL: https://github.com/apache/beam/pull/13209#discussion_r514010115 ## File path: sdks/go/pkg/beam/coder_test.go ## @@ -75,46 +134,51 @@ func TestCoders(t *testing.T) { 1, true, "a string", - map[int64]string{1: "one", 11: "oneone", 21: "twoone", 1211: "onetwooneone"}, + map[int64]string{1: "one", 11: "oneone", 21: "twoone", 1211: "onetwooneone"}, // (not supported by custom type registration) struct { A int B *string C bool }{4, &ptrString, false}, - [...]int64{1, 2, 3, 4, 5}, // array + [...]int64{1, 2, 3, 4, 5}, // array (not supported by custom type registration) []int64{1, 2, 3, 4, 5},// slice struct { A []int B [3]int }{A: []int{1, 2, 3}, B: [...]int{4, 5, 6}}, + [...]struct{ A int }{{1}, {2}, {3}, {4}, {5}}, + []struct{ B int }{{1}, {2}, {3}, {4}, {5}}, + regTestType{[4]int{4, 2, 4, 2}}, } for _, test := range tests { - var results []string - rt := reflect.TypeOf(test) - enc := NewElementEncoder(rt) - for i := 0; i < 10; i++ { - var buf bytes.Buffer - if err := enc.Encode(test, &buf); err != nil { - t.Fatalf("Failed to encode %v: %v", tests, err) + t.Run(fmt.Sprintf("%T", test), func(t *testing.T) { + var results []string + rt := reflect.TypeOf(test) + enc := NewElementEncoder(rt) + for i := 0; i < 10; i++ { + var buf bytes.Buffer + if err := enc.Encode(test, &buf); err != nil { + t.Fatalf("Failed to encode %v: %v", test, err) + } + results = append(results, string(buf.Bytes())) } - results = append(results, string(buf.Bytes())) - } - for i, d := range results { - if d != results[0] { - t.Errorf("coder not deterministic: encoding %d not the same as the first encoding: %v != %v ", i, d, results[0]) + for i, d := range results { + if d != results[0] { + t.Errorf("coder not deterministic: encoding %d not the same as the first encoding: %v != %v ", i, d, results[0]) + } } - } - dec := NewElementDecoder(rt) - buf := bytes.NewBuffer([]byte(results[0])) - decoded, err := dec.Decode(buf) - if err != nil { - t.Fatalf("Failed to decode: %v", err) - } + dec := NewElementDecoder(rt) + buf := bytes.NewBuffer([]byte(results[0])) + decoded, err := dec.Decode(buf) + if err != nil { + t.Fatalf("Failed to decode: %q, into %v", results[0], err) Review comment: Is the `%v` in the format string supposed to be the error? It reads like it's meant to output the expected decoded value (i.e. `test`). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ajamato commented on pull request #13217: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics
ajamato commented on pull request #13217: URL: https://github.com/apache/beam/pull/13217#issuecomment-718382171 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib opened a new pull request #13218: [BEAM-10974] Skip GroupByKeyTest.testLargeKeys10MB.
ibzib opened a new pull request #13218: URL: https://github.com/apache/beam/pull/13218 I tried changing the taskmanager memory size, but couldn't get it to work. R: @mxm cc: @tysonjh Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/
[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally
kennknowles commented on pull request #13134: URL: https://github.com/apache/beam/pull/13134#issuecomment-718335567 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.
boyuanzz commented on a change in pull request #13105: URL: https://github.com/apache/beam/pull/13105#discussion_r513912395 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleCheckpointHandlers.java ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.fnexecution.control; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTags; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternalsFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.IdGenerator; +import org.apache.beam.sdk.fn.IdGenerators; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility methods for creating {@link BundleCheckpointHandler}s. */ +public class BundleCheckpointHandlers { + + /** + * A {@link BundleCheckpointHandler} which uses {@link + * org.apache.beam.runners.core.TimerInternals.TimerData} ans {@link + * org.apache.beam.sdk.state.ValueState} to reschedule {@link DelayedBundleApplication}. + */ + public static class StateAndTimerBundleCheckpointHandler implements BundleCheckpointHandler { +private static final Logger LOG = +LoggerFactory.getLogger(StateAndTimerBundleCheckpointHandler.class); +private final TimerInternalsFactory timerInternalsFactory; +private final StateInternalsFactory stateInternalsFactory; +private final Coder> residualCoder; +private final Coder windowCoder; +private final IdGenerator idGenerator = IdGenerators.incrementingLongs(); +public static final String SDF_PREFIX = "sdf_checkpoint"; + +public StateAndTimerBundleCheckpointHandler( +TimerInternalsFactory timerInternalsFactory, +StateInternalsFactory stateInternalsFactory, +Coder> residualCoder, +Coder windowCoder) { + this.residualCoder = residualCoder; + this.windowCoder = windowCoder; + this.timerInternalsFactory = timerInternalsFactory; + this.stateInternalsFactory = stateInternalsFactory; +} + +/** + * A helper function to help check whether the given timer is the timer which is set for + * rescheduling {@link DelayedBundleApplication}. + */ +public static boolean isSdfTimer(String timerId) { + return timerId.startsWith(SDF_PREFIX); +} + +private static String constructSdfCheckpointId(String id, int index) { + return SDF_PREFIX + ":" + id + ":" + index; +} + +@Override +public void onCheckpoint(ProcessBundleResponse response) { + String id = idGenerator.getId(); + for (int index = 0; index < response.getResidualRootsCount(); index++) { +DelayedBundleApplication residual = response.getResidualRoots(index); +if (!residual.hasApplication()) { + continue; +} +String tag = constructSdfCheckpointId(id, index); +try { + WindowedValue stateValue = + CoderUtils.decodeFromByteArray( + residualCoder, residual.getApplication().getElement().toByteArray()); + TimerInternals timerInternals = + timerInternalsFactory.timerInternalsForKey((stateValue.getValue())); + StateInternals stateInternals = + stateInternalsFactory.stateInternalsForKey(stateValue.getValue()); + // Calculate the timestamp for the timer. + Instant timestamp = Instant.now(); + if (residual.hasRequestedTimeDelay()) { +timestamp = timestamp.plus(res
[GitHub] [beam] boyuanzz commented on a change in pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2
boyuanzz commented on a change in pull request #12806: URL: https://github.com/apache/beam/pull/12806#discussion_r513895196 ## File path: sdks/python/apache_beam/io/gcp/pubsub.py ## @@ -418,10 +439,11 @@ def __init__(self, id_label, # type: Optional[str] with_attributes, # type: bool timestamp_attribute # type: Optional[str] - ): + ): self.coder = coders.BytesCoder() self.full_topic = topic self.id_label = id_label +#TODO(BEAM-10869): Remove with_attributes since we will never look at it. Review comment: We can remove `with_attributes` from `PubSubSink` in the cleanup PR. @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally
kennknowles commented on pull request #13134: URL: https://github.com/apache/beam/pull/13134#issuecomment-718315067 run java precommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally
kennknowles commented on pull request #13134: URL: https://github.com/apache/beam/pull/13134#issuecomment-718315024 I'm doing something like that on my laptop. Certainly the core SDK compile is slower, but even with a huge slowdown in compile the test execution should dominate. So I do think it may be caching that is the difference. Running it a few times to warm the build cache might also give a sense of realistic CI times. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally
kennknowles commented on pull request #13134: URL: https://github.com/apache/beam/pull/13134#issuecomment-718303211 Java precommit seems to have degraded in runtime. - Before: https://scans.gradle.com/s/gohimd5ekkcmo/timeline - After: https://scans.gradle.com/s/z4brjsxfpftxy/timeline So it went from 40 minutes to 90 minutes. It isn't apples-to-apples as we have a lot of caching effects since they are just running on Jenkins: - Before: `2375 tasks executed in 116 projects in 34m 18.746s, with 162 avoided tasks saving 1h 19.326s` - After: `2463 tasks executed in 116 projects in 1h 20m 11.508s, with 38 avoided tasks saving 14m 48.735s` I think zero-cache builds in some semi-controlled environment might give a better sense whether this needs to be treated specially. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ajamato commented on pull request #13217: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics
ajamato commented on pull request #13217: URL: https://github.com/apache/beam/pull/13217#issuecomment-718301011 R: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ajamato commented on pull request #13217: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics
ajamato commented on pull request #13217: URL: https://github.com/apache/beam/pull/13217#issuecomment-718300927 R: @pabloem This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ajamato opened a new pull request #13217: [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics
ajamato opened a new pull request #13217: URL: https://github.com/apache/beam/pull/13217 [BEAM-11092] Add bigquery io request count metric, implementing HarnessMonitoringInfos and process_wide metrics Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python
[GitHub] [beam] codecov[bot] edited a comment on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2
codecov[bot] edited a comment on pull request #12806: URL: https://github.com/apache/beam/pull/12806#issuecomment-692876818 # [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=h1) Report > Merging [#12806](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **decrease** coverage by `42.05%`. > The diff coverage is `16.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12806/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master #12806 +/- ## === - Coverage 82.48% 40.42% -42.06% === Files 455 449-6 Lines 5487653539 -1337 === - Hits4526621645-23621 - Misses 961031894+22284 ``` | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) | Coverage Δ | | |---|---|---| | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `19.08% <0.00%> (-57.85%)` | :arrow_down: | | [...python/apache\_beam/runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==) | `32.55% <11.11%> (-61.16%)` | :arrow_down: | | [sdks/python/apache\_beam/io/gcp/pubsub.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3B1YnN1Yi5weQ==) | `40.78% <20.00%> (-51.53%)` | :arrow_down: | | [...python/apache\_beam/examples/complete/distribopt.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvY29tcGxldGUvZGlzdHJpYm9wdC5weQ==) | `0.00% <0.00%> (-98.59%)` | :arrow_down: | | [...dks/python/apache\_beam/transforms/create\_source.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jcmVhdGVfc291cmNlLnB5) | `0.00% <0.00%> (-98.19%)` | :arrow_down: | | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `0.00% <0.00%> (-98.15%)` | :arrow_down: | | [...e\_beam/runners/interactive/testing/mock\_ipython.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL21vY2tfaXB5dGhvbi5weQ==) | `7.14% <0.00%> (-92.86%)` | :arrow_down: | | [.../examples/snippets/transforms/elementwise/pardo.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9wYXJkby5weQ==) | `11.36% <0.00%> (-88.64%)` | :arrow_down: | | [sdks/python/apache\_beam/typehints/opcodes.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL29wY29kZXMucHk=) | `0.00% <0.00%> (-87.92%)` | :arrow_down: | | [...s/snippets/transforms/aggregation/combineperkey.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9hZ2dyZWdhdGlvbi9jb21iaW5lcGVya2V5LnB5) | `11.95% <0.00%> (-86.96%)` | :arrow_down: | | ... and [290 more](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=footer). Last update [376d2a6...7a309c2](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on pull request #13216: Update beam-2.25.0.md
y1chi commented on pull request #13216: URL: https://github.com/apache/beam/pull/13216#issuecomment-718284437 R: @robinyqiu This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] codecov[bot] edited a comment on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2
codecov[bot] edited a comment on pull request #12806: URL: https://github.com/apache/beam/pull/12806#issuecomment-692876818 # [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=h1) Report > Merging [#12806](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **decrease** coverage by `42.05%`. > The diff coverage is `16.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12806/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master #12806 +/- ## === - Coverage 82.48% 40.42% -42.06% === Files 455 449-6 Lines 5487653539 -1337 === - Hits4526621645-23621 - Misses 961031894+22284 ``` | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) | Coverage Δ | | |---|---|---| | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `19.08% <0.00%> (-57.85%)` | :arrow_down: | | [...python/apache\_beam/runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==) | `32.55% <11.11%> (-61.16%)` | :arrow_down: | | [sdks/python/apache\_beam/io/gcp/pubsub.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3B1YnN1Yi5weQ==) | `40.78% <20.00%> (-51.53%)` | :arrow_down: | | [...python/apache\_beam/examples/complete/distribopt.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvY29tcGxldGUvZGlzdHJpYm9wdC5weQ==) | `0.00% <0.00%> (-98.59%)` | :arrow_down: | | [...dks/python/apache\_beam/transforms/create\_source.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jcmVhdGVfc291cmNlLnB5) | `0.00% <0.00%> (-98.19%)` | :arrow_down: | | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `0.00% <0.00%> (-98.15%)` | :arrow_down: | | [...e\_beam/runners/interactive/testing/mock\_ipython.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL21vY2tfaXB5dGhvbi5weQ==) | `7.14% <0.00%> (-92.86%)` | :arrow_down: | | [.../examples/snippets/transforms/elementwise/pardo.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9wYXJkby5weQ==) | `11.36% <0.00%> (-88.64%)` | :arrow_down: | | [sdks/python/apache\_beam/typehints/opcodes.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL29wY29kZXMucHk=) | `0.00% <0.00%> (-87.92%)` | :arrow_down: | | [...s/snippets/transforms/aggregation/combineperkey.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9hZ2dyZWdhdGlvbi9jb21iaW5lcGVya2V5LnB5) | `11.95% <0.00%> (-86.96%)` | :arrow_down: | | ... and [290 more](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=footer). Last update [376d2a6...2d38a8b](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi opened a new pull request #13216: Update beam-2.25.0.md
y1chi opened a new pull request #13216: URL: https://github.com/apache/beam/pull/13216 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build Status](https://
[GitHub] [beam] rohdesamuel opened a new pull request #13215: [BEAM-11151] Adds the ToStringFnRunner to Java
rohdesamuel opened a new pull request #13215: URL: https://github.com/apache/beam/pull/13215 This adds the ToString implementation to the Java SDK Harness. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)[![Build Status](
[GitHub] [beam] amaliujia commented on pull request #13094: [BEAM-5570] Update javacc dependency
amaliujia commented on pull request #13094: URL: https://github.com/apache/beam/pull/13094#issuecomment-718281226 @iemejia do you have any other comment on this PR? I am planning to merge this PR in early next week? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia edited a comment on pull request #13094: [BEAM-5570] Update javacc dependency
amaliujia edited a comment on pull request #13094: URL: https://github.com/apache/beam/pull/13094#issuecomment-718281226 @iemejia do you have any other comment on this PR? I am planning to merge this PR in early next week if there is no more comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] codecov[bot] edited a comment on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2
codecov[bot] edited a comment on pull request #12806: URL: https://github.com/apache/beam/pull/12806#issuecomment-692876818 # [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=h1) Report > Merging [#12806](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **decrease** coverage by `42.05%`. > The diff coverage is `16.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12806/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master #12806 +/- ## === - Coverage 82.48% 40.42% -42.06% === Files 455 449-6 Lines 5487653539 -1337 === - Hits4526621645-23621 - Misses 961031894+22284 ``` | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) | Coverage Δ | | |---|---|---| | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `19.08% <0.00%> (-57.85%)` | :arrow_down: | | [...python/apache\_beam/runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==) | `32.55% <11.11%> (-61.16%)` | :arrow_down: | | [sdks/python/apache\_beam/io/gcp/pubsub.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3B1YnN1Yi5weQ==) | `40.78% <20.00%> (-51.53%)` | :arrow_down: | | [...python/apache\_beam/examples/complete/distribopt.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvY29tcGxldGUvZGlzdHJpYm9wdC5weQ==) | `0.00% <0.00%> (-98.59%)` | :arrow_down: | | [...dks/python/apache\_beam/transforms/create\_source.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jcmVhdGVfc291cmNlLnB5) | `0.00% <0.00%> (-98.19%)` | :arrow_down: | | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `0.00% <0.00%> (-98.15%)` | :arrow_down: | | [...e\_beam/runners/interactive/testing/mock\_ipython.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL21vY2tfaXB5dGhvbi5weQ==) | `7.14% <0.00%> (-92.86%)` | :arrow_down: | | [.../examples/snippets/transforms/elementwise/pardo.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9wYXJkby5weQ==) | `11.36% <0.00%> (-88.64%)` | :arrow_down: | | [sdks/python/apache\_beam/typehints/opcodes.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL29wY29kZXMucHk=) | `0.00% <0.00%> (-87.92%)` | :arrow_down: | | [...s/snippets/transforms/aggregation/combineperkey.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9hZ2dyZWdhdGlvbi9jb21iaW5lcGVya2V5LnB5) | `11.95% <0.00%> (-86.96%)` | :arrow_down: | | ... and [290 more](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=footer). Last update [376d2a6...fe09155](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn merged pull request #13193: [BEAM-7372][BEAM-8371][BEAM-9372] drop python 2.7 and 3.5 support from python container
tvalentyn merged pull request #13193: URL: https://github.com/apache/beam/pull/13193 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on a change in pull request #13187: Updated shared.py comments
tvalentyn commented on a change in pull request #13187: URL: https://github.com/apache/beam/pull/13187#discussion_r513823318 ## File path: sdks/python/apache_beam/utils/shared.py ## @@ -26,22 +26,31 @@ To share a very large list across all threads of each worker in a DoFn:: - class GetNthStringFn(beam.DoFn): -def __init__(self, shared_handle): - self._shared_handle = shared_handle - -def process(self, element): - def initialize_list(): -# Build the giant initial list. -return [str(i) for i in range(100)] - - giant_list = self._shared_handle.acquire(initialize_list) - yield giant_list[element] - - p = beam.Pipeline() - shared_handle = shared.Shared() - (p | beam.Create([2, 4, 6, 8]) - | beam.ParDo(GetNthStringFn(shared_handle))) +# Several built-in types such as list and dict do not directly support weak Review comment: Consider following wording for line 29 to prepare the reader for why weak references are discussed: Shared is a helper class for managing a single instance of an object shared by multiple threads within the same process. Instances of Shared are serializable objects that can be shared by all threads of each worker process. A Shared object encapsulates a weak reference to a singleton instance of the shared resource. The singleton is lazily initialized by calls to Shared.acquire(). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.
boyuanzz commented on pull request #12572: URL: https://github.com/apache/beam/pull/12572#issuecomment-718267176 I manually tested this transform with Dataflow runner v2 and I can confirm that the transform works as expected. But it's not easy to have an E2E test since the transform doesn't output values and it interacts with the external system. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] yifanmai commented on a change in pull request #13177: Sort stages according to data edges as well as must-follows.
yifanmai commented on a change in pull request #13177: URL: https://github.com/apache/beam/pull/13177#discussion_r511169718 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py ## @@ -1702,13 +1702,26 @@ def sort_stages(stages, pipeline_context): seen = set() # type: Set[Stage] ordered = [] + producers = { + pcoll: stage + for stage in all_stages for t in stage.transforms Review comment: Do we need to traverse into sub-transforms to fetch their inputs, or are we guaranteed that all transforms are leaf transforms? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #12572: [BEAM-10123] Add Kafka Commit transform.
boyuanzz commented on pull request #12572: URL: https://github.com/apache/beam/pull/12572#issuecomment-718258631 > > Yeah I also want to have some tests around this but it may not be possible to involve actual Kafka consumer to do so. The way I can come up is to use mock but I'm not sure whether it's feasible. > > Can we add "SDF Read"-related tests to `KafkaIOIT` in this case? Sorry for the late. I was on something urgent. KafkaIOIt will use `SDF Read` automatically if the runner is using `beam_fn_api` and `use_sdf_kafka_read `. In the upcoming future, we will make `SDF Read` as default for Kafka. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #13201: Updating BigQuery client for Python
pabloem commented on pull request #13201: URL: https://github.com/apache/beam/pull/13201#issuecomment-718228953 Run Python_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.
amaliujia commented on pull request #13200: URL: https://github.com/apache/beam/pull/13200#issuecomment-718252195 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #13066: [BEAM-11052] Memoize to_pcollection
TheNeuralBit commented on pull request #13066: URL: https://github.com/apache/beam/pull/13066#issuecomment-718251075 Rebased to resolve merge conflicts This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on a change in pull request #13142: [BEAM-5939] - Deduplicate constants
tvalentyn commented on a change in pull request #13142: URL: https://github.com/apache/beam/pull/13142#discussion_r513802306 ## File path: sdks/python/apache_beam/runners/dataflow/internal/names.py ## @@ -27,7 +27,13 @@ # Standard file names used for staging files. from builtins import object -DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar' +# pylint: disable=unused-import +from apache_beam.runners.internal.names import DATAFLOW_SDK_TARBALL_FILE +from apache_beam.runners.internal.names import PICKLED_MAIN_SESSION_FILE Review comment: I think we could avoid importing all of these except for PICKLED_MAIN_SESSION_FILE, and update all references in Beam codeabase to use apache_beam.runners.internal.names instead, similar what we do in apiclient.py. ``` # pylint: disable=unused-import # Used by Dataflow legacy worker. from apache_beam.runners.internal.names import PICKLED_MAIN_SESSION_FILE ``` ## File path: sdks/python/apache_beam/runners/internal/names.py ## @@ -20,8 +20,11 @@ # All constants are for internal use only; no backwards-compatibility # guarantees. +DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar' Review comment: Let's rename this to: `STAGED_SDK_SOURCES_FILENAME = 'dataflow_python_sdk.tar' # Current value is hardcoded in Dataflow internal infrastructure; please don't change without a review from Dataflow maintainers.` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #13193: [BEAM-7372][BEAM-8371][BEAM-9372] drop python 2.7 and 3.5 support from python container
tvalentyn commented on pull request #13193: URL: https://github.com/apache/beam/pull/13193#issuecomment-718224258 Run PythonDocker PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #13201: Updating BigQuery client for Python
pabloem commented on pull request #13201: URL: https://github.com/apache/beam/pull/13201#issuecomment-718237863 Run Python_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nehsyc commented on a change in pull request #13208: [BEAM-10703] Add an option to GroupIntoBatches to output ShardedKeys. Update Dataflow pipeline translation accordingly.
nehsyc commented on a change in pull request #13208: URL: https://github.com/apache/beam/pull/13208#discussion_r513774255 ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java ## @@ -103,43 +156,76 @@ public void process(ProcessContext c) { } @Override -public PTransformReplacement>, PCollection>>> +public PTransformReplacement>, PCollection, Iterable>>> getReplacementTransform( AppliedPTransform< -PCollection>, PCollection>>, GroupIntoBatches> +PCollection>, +PCollection, Iterable>>, +GroupIntoBatches.WithShardedKey> transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new StreamingGroupIntoBatches(runner, transform.getTransform())); + new StreamingGroupIntoBatchesWithShardedKey<>(runner, transform.getTransform())); } @Override public Map, ReplacementOutput> mapOutputs( -Map, PCollection> outputs, PCollection>> newOutput) { +Map, PCollection> outputs, +PCollection, Iterable>> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } } /** - * Specialized implementation of {@link GroupIntoBatches} for unbounded Dataflow pipelines. The - * override does the same thing as the original transform but additionally record the input to add - * corresponding properties during the graph translation. + * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for unbounded Dataflow + * pipelines. The override does the same thing as the original transform but additionally records + * the input of {@code GroupIntoBatchesDoFn} in order to append relevant step properties during + * the graph translation. */ - static class StreamingGroupIntoBatches - extends PTransform>, PCollection>>> { + static class StreamingGroupIntoBatchesWithShardedKey + extends PTransform>, PCollection, Iterable>>> { private final transient DataflowRunner runner; -private final GroupIntoBatches original; +private final GroupIntoBatches.WithShardedKey original; -public StreamingGroupIntoBatches(DataflowRunner runner, GroupIntoBatches original) { +public StreamingGroupIntoBatchesWithShardedKey( +DataflowRunner runner, GroupIntoBatches.WithShardedKey original) { this.runner = runner; this.original = original; } @Override -public PCollection>> expand(PCollection> input) { - runner.maybeRecordPCollectionWithAutoSharding(input); - return input.apply(original); +public PCollection, Iterable>> expand(PCollection> input) { + PCollection, V>> intermediate_input = ShardKeys(input); + + runner.maybeRecordPCollectionWithAutoSharding(intermediate_input); + + if (original.getMaxBufferingDuration() != null) { Review comment: We need to recognize the GroupIntoBatchesDoFn which is a private member of the transform. Here we record the input pcoll of GroupIntoBatchesDoFn, which would be the output of the key-sharding DoFn, so in the translation we can append autosharding properties for those steps whose input was recorded. This doesn't scale indeed. Any suggestions? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rohdesamuel opened a new pull request #13214: Adds the ToString well-known transform URN
rohdesamuel opened a new pull request #13214: URL: https://github.com/apache/beam/pull/13214 This adds the ToString URN to the list of well-known transforms. the ToString transform allows for Runners to translate a given element into a human-readable string. This is useful for debugging, creating tools, or features on the runner side that generalize across SDKs that require element introspection. Design-doc: https://docs.google.com/document/d/1v7iWj0LIum04mYwRM_Cvze915tATwmEzLrqj_uVBkCE/edit# Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.
[GitHub] [beam] tvalentyn commented on pull request #13193: [BEAM-7372][BEAM-8371][BEAM-9372] drop python 2.7 and 3.5 support from python container
tvalentyn commented on pull request #13193: URL: https://github.com/apache/beam/pull/13193#issuecomment-718220319 LGTM, thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.
ibzib commented on pull request #13200: URL: https://github.com/apache/beam/pull/13200#issuecomment-718234442 @amaliujia Rules are fixed, PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode
TheNeuralBit commented on a change in pull request #13139: URL: https://github.com/apache/beam/pull/13139#discussion_r513799533 ## File path: sdks/python/apache_beam/dataframe/frames.py ## @@ -619,21 +619,75 @@ def assign(self, **kwargs): "instances are supported.") return frame_base._elementwise_method('assign')(self, **kwargs) - apply = frame_base.not_implemented_method('apply') - explode = frame_base.not_implemented_method('explode') isin = frame_base.not_implemented_method('isin') append = frame_base.not_implemented_method('append') combine = frame_base.not_implemented_method('combine') combine_first = frame_base.not_implemented_method('combine_first') count = frame_base.not_implemented_method('count') - drop = frame_base.not_implemented_method('drop') eval = frame_base.not_implemented_method('eval') reindex = frame_base.not_implemented_method('reindex') melt = frame_base.not_implemented_method('melt') pivot = frame_base.not_implemented_method('pivot') pivot_table = frame_base.not_implemented_method('pivot_table') + @frame_base.args_to_kwargs(pd.DataFrame) + @frame_base.populate_defaults(pd.DataFrame) + def explode(self, column, ignore_index): +# ignoring the index will not preserve it +preserves = (partitionings.Nothing() if ignore_index + else partitionings.Singleton()) +return frame_base.DeferredFrame.wrap( +expressions.ComputedExpression( +'explode', +lambda df: df.explode(column, ignore_index), +[self._expr], +preserves_partition_by=preserves, +requires_partition_by=partitionings.Nothing())) + + + @frame_base.args_to_kwargs(pd.DataFrame) + @frame_base.populate_defaults(pd.DataFrame) + @frame_base.maybe_inplace + def drop(self, **kwargs): +labels = kwargs.get('labels', None) Review comment: Ah whoops. I switched it over to use args directly This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.
amaliujia commented on pull request #13200: URL: https://github.com/apache/beam/pull/13200#issuecomment-718242175 Overall LGTM. @kennknowles do you have other comments except for the interface discussion? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] codecov[bot] edited a comment on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2
codecov[bot] edited a comment on pull request #12806: URL: https://github.com/apache/beam/pull/12806#issuecomment-692876818 # [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=h1) Report > Merging [#12806](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **decrease** coverage by `42.05%`. > The diff coverage is `16.00%`. [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12806/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) ```diff @@ Coverage Diff @@ ## master #12806 +/- ## === - Coverage 82.48% 40.42% -42.06% === Files 455 449-6 Lines 5487653539 -1337 === - Hits4526621645-23621 - Misses 961031894+22284 ``` | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=tree) | Coverage Δ | | |---|---|---| | [...on/apache\_beam/runners/dataflow/dataflow\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9kYXRhZmxvd19ydW5uZXIucHk=) | `19.08% <0.00%> (-57.85%)` | :arrow_down: | | [...python/apache\_beam/runners/direct/direct\_runner.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZGlyZWN0X3J1bm5lci5weQ==) | `32.55% <11.11%> (-61.16%)` | :arrow_down: | | [sdks/python/apache\_beam/io/gcp/pubsub.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3B1YnN1Yi5weQ==) | `40.78% <20.00%> (-51.53%)` | :arrow_down: | | [...python/apache\_beam/examples/complete/distribopt.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvY29tcGxldGUvZGlzdHJpYm9wdC5weQ==) | `0.00% <0.00%> (-98.59%)` | :arrow_down: | | [...dks/python/apache\_beam/transforms/create\_source.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jcmVhdGVfc291cmNlLnB5) | `0.00% <0.00%> (-98.19%)` | :arrow_down: | | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `0.00% <0.00%> (-98.15%)` | :arrow_down: | | [...e\_beam/runners/interactive/testing/mock\_ipython.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS90ZXN0aW5nL21vY2tfaXB5dGhvbi5weQ==) | `7.14% <0.00%> (-92.86%)` | :arrow_down: | | [.../examples/snippets/transforms/elementwise/pardo.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9lbGVtZW50d2lzZS9wYXJkby5weQ==) | `11.36% <0.00%> (-88.64%)` | :arrow_down: | | [sdks/python/apache\_beam/typehints/opcodes.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL29wY29kZXMucHk=) | `0.00% <0.00%> (-87.92%)` | :arrow_down: | | [...s/snippets/transforms/aggregation/combineperkey.py](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvdHJhbnNmb3Jtcy9hZ2dyZWdhdGlvbi9jb21iaW5lcGVya2V5LnB5) | `11.95% <0.00%> (-86.96%)` | :arrow_down: | | ... and [290 more](https://codecov.io/gh/apache/beam/pull/12806/diff?src=pr&el=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=footer). Last update [376d2a6...e4095ad](https://codecov.io/gh/apache/beam/pull/12806?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] nehsyc commented on a change in pull request #13208: [BEAM-10703] Add an option to GroupIntoBatches to output ShardedKeys. Update Dataflow pipeline translation accordingly.
nehsyc commented on a change in pull request #13208: URL: https://github.com/apache/beam/pull/13208#discussion_r513784255 ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java ## @@ -92,9 +96,58 @@ public void process(ProcessContext c) { } } + static class BatchGroupIntoBatchesWithShardedKeyOverrideFactory Review comment: Two things that made me add the override for `withShardedKeys` - We insert an explicit GBK for batch stateful dofns (although I feel this should be done in the backend). https://github.com/apache/beam/blob/562041956efeae2a186b1d815ea6bcd7d54682ae/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L228 - Currently the batch GIB doesn't use keyed states but instead partitions the iterables after GBK: https://github.com/apache/beam/blob/562041956efeae2a186b1d815ea6bcd7d54682ae/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java#L84 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.
amaliujia commented on a change in pull request #13200: URL: https://github.com/apache/beam/pull/13200#discussion_r513761419 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; +import org.apache.beam.sdk.transforms.Combine; + +/** + * Provider for user-defined functions written in Java. Implementations should be annotated with + * {@link com.google.auto.service.AutoService}. + */ +public interface UdfProvider { + /** Maps function names to scalar function implementations. */ + default Map userDefinedScalarFunctions() { Review comment: Per our offline chat, we can continue discussing what a good interface should be and this PR should not be blocked. We will mark this interface as experimental and refine when necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz merged pull request #13144: [BEAM-10475] Add max buffering duration option for GroupIntoBatches transform in Python
boyuanzz merged pull request #13144: URL: https://github.com/apache/beam/pull/13144 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on pull request #13141: [BEAM-9547] Dataframe corrwith.
robertwb commented on pull request #13141: URL: https://github.com/apache/beam/pull/13141#issuecomment-718200546 Run PythonLint PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rohdesamuel commented on pull request #13210: [BEAM-10994] Update hot key detection log message
rohdesamuel commented on pull request #13210: URL: https://github.com/apache/beam/pull/13210#issuecomment-718194658 R: @pabloem This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim merged pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.
udim merged pull request #13213: URL: https://github.com/apache/beam/pull/13213 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rohdesamuel commented on a change in pull request #13210: [BEAM-10994] Update hot key detection log message
rohdesamuel commented on a change in pull request #13210: URL: https://github.com/apache/beam/pull/13210#discussion_r513741622 ## File path: runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/HotKeyLogger.java ## @@ -52,7 +52,9 @@ public void logHotKeyDetection(String userStepName, Duration hotKeyAge) { LOG.warn( "A hot key was detected in step '{}' with age of '{}'. This is " + "a symptom of key distribution being skewed. To fix, please inspect your data and " -+ "pipeline to ensure that elements are evenly distributed across your key space.", ++ "pipeline to ensure that elements are evenly distributed across your key space. If " ++ "you want to log the plain-text key to Cloud Logging please re-run with the " ++ "`hotKeyLoggingEnabled` pipeline option.", Review comment: Sure, I added a link to the Cloud docs about specifying pipeline options. This generalizes across different languages and is a good how-to guide. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.
udim commented on pull request #13213: URL: https://github.com/apache/beam/pull/13213#issuecomment-718184130 I believe this is a job setting, so the seed job needs to run to update it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.
udim commented on pull request #13213: URL: https://github.com/apache/beam/pull/13213#issuecomment-718183902 run seed job This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] reuvenlax commented on pull request #12915: [BEAM-7386] Introduce temporal inner join.
reuvenlax commented on pull request #12915: URL: https://github.com/apache/beam/pull/12915#issuecomment-718179094 Sorry for the delay. AFAIK both this and the schema library are limited today to equijoins. The schema API is designed so that we can extend it later with non equijoins, however doing arbitrary join conditions in a distributed manner can be a hard problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.
tysonjh commented on pull request #13213: URL: https://github.com/apache/beam/pull/13213#issuecomment-718174027 R: @udim The change didn't apply to the precommit run, it still shows the No JDK named ‘JDK 1.8 (latest)’ found error. Maybe it needs to be committed to take effect? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode
TheNeuralBit commented on a change in pull request #13139: URL: https://github.com/apache/beam/pull/13139#discussion_r513711027 ## File path: sdks/python/apache_beam/dataframe/frames.py ## @@ -1513,6 +1584,8 @@ def repeat(self, repeats): 'repeat', lambda series: series.str.repeat(repeats), [self._expr], + # Output will also be a str Series Review comment: Yeah when I added the verification of the proxy in `TransformTest` I found that this proxy was incorrectly inferred as `bool` for the zipping case tested there. ```py In [10]: proxy.dtypes Out[10]: strobject repeats int64 dtype: object In [11]: proxy.str.str.repeat(proxy.repeats) Out[11]: Series([], Name: str, dtype: bool) ``` The actual operation does produce `object` though: ```py In [13]: df.str.str.repeat(df.repeats) Out[13]: 0 AAA 1B 2 3D 4 EE Name: str, dtype: object ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode
TheNeuralBit commented on a change in pull request #13139: URL: https://github.com/apache/beam/pull/13139#discussion_r513711378 ## File path: sdks/python/apache_beam/dataframe/frames.py ## @@ -1513,6 +1584,8 @@ def repeat(self, repeats): 'repeat', lambda series: series.str.repeat(repeats), [self._expr], + # Output will also be a str Series Review comment: Maybe something we should fix upstream and patch around here in the meantime? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider
TheNeuralBit commented on a change in pull request #12780: URL: https://github.com/apache/beam/pull/12780#discussion_r513699837 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/RowToPubsubMessage.java ## @@ -57,14 +73,43 @@ public static RowToPubsubMessage withTimestampAttribute(boolean useTimestampAttr ? input.apply(WithTimestamps.of((row) -> row.getDateTime(TIMESTAMP_FIELD).toInstant())) : input; -return withTimestamp -.apply(DropFields.fields(TIMESTAMP_FIELD)) -.apply(ToJson.of()) -.apply( -MapElements.into(TypeDescriptor.of(PubsubMessage.class)) -.via( -(String json) -> -new PubsubMessage( -json.getBytes(StandardCharsets.ISO_8859_1), ImmutableMap.of(; +withTimestamp = withTimestamp.apply(DropFields.fields(TIMESTAMP_FIELD)); +switch (payloadFormat) { + case JSON: +return withTimestamp +.apply("MapRowToJsonString", ToJson.of()) +.apply("MapToJsonBytes", MapElements.via(new StringToBytes())) +.apply("MapToPubsubMessage", MapElements.via(new ToPubsubMessage())); + case AVRO: +return withTimestamp +.apply( +"MapRowToAvroBytes", + MapElements.via(AvroUtils.getRowToAvroBytesFunction(payloadSchema))) +.apply("MapToPubsubMessage", MapElements.via(new ToPubsubMessage())); + default: +throw new IllegalArgumentException("Unsupported payload format: " + payloadFormat); +} + } + + private static class StringToBytes extends SimpleFunction { +@Override +public byte[] apply(String s) { + return s.getBytes(ISO_8859_1); Review comment: I'm not sure why I opted to get ISO_8859_1 encoded bytes here... Could you change this to use UTF_8? ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow.java ## @@ -175,11 +213,21 @@ public void processElement( private final boolean useDlq; +private final PayloadFormat payloadFormat; + private transient volatile @Nullable ObjectMapper objectMapper; -protected NestedSchemaPubsubMessageToRow(Schema messageSchema, boolean useDlq) { +private final SimpleFunction avroBytesToRowFn; + +private final Schema payloadSchema; + +protected NestedSchemaPubsubMessageToRow( +Schema messageSchema, boolean useDlq, PayloadFormat payloadFormat) { this.messageSchema = messageSchema; this.useDlq = useDlq; + this.payloadFormat = payloadFormat; + this.payloadSchema = messageSchema.getField(PAYLOAD_FIELD).getType().getRowSchema(); + this.avroBytesToRowFn = AvroUtils.getAvroBytesToRowFunction(payloadSchema); Review comment: Instead of eagerly generating the avroBytesToRowFn (even if we won't need it) and then branching on the payloadFormat for every element in parsePayload, we should instead generate a parsePayload function when it's needed. This could work similarly to what the current version does with`objectMapper`, except we would create and store a `Function`. In the JSON case we'd generate the ObjectMapper and wrap that in a function, and in the Avro case we'd call getAvroBytesToRowFunction and wrap it. ## File path: website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md ## @@ -251,6 +253,8 @@ TBLPROPERTIES '{"timestampAttributeKey": "key", "deadLetterQueue": "projects/[PR * `deadLetterQueue`: The topic into which messages are written if the payload was not parsed. If not specified, an exception is thrown for parsing failures. +* `format`: Optional. Allows you to specify the Pubsub payload format. +Possible values are {`json`, `avro`}. Defaults to `json`. Review comment: :+1: thanks ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubAvroIT.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +
[GitHub] [beam] youngoli commented on a change in pull request #13188: [BEAM-11108] Add a version of TextIO implemented via SDF.
youngoli commented on a change in pull request #13188: URL: https://github.com/apache/beam/pull/13188#discussion_r513702465 ## File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go ## @@ -63,10 +63,87 @@ func TestRestriction_EvenSplits(t *testing.T) { t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v or %v", split.Start, split.End, size, min, min+1) } - // Check: All elements are still in a split restrictions. This - // logic assumes that the splits are returned in order which - // isn't guaranteed by EvenSplits, but this check is way easier - // with the assumption. + // Check: All elements are still in a split restriction and + // the restrictions are in the appropriate ascending order. + if split.Start != prevEnd { + t.Errorf("restriction range [%v, %v] missing after splits.", + prevEnd, split.Start) + } else { + prevEnd = split.End + } + } + if prevEnd != r.End { + t.Errorf("restriction range [%v, %v] missing after splits.", + prevEnd, r.End) + } + }) + } +} + +// TestRestriction_SizedSplits tests various splits and checks that they all +// follow the contract for SizedSplits. This means that all restrictions match +// the given size unless it is a remainder, and that each element is present +// in the split restrictions. +func TestRestriction_SizedSplits(t *testing.T) { + tests := []struct { + rest Restriction + size int64 + want []Restriction + }{ + { + rest: Restriction{Start: 0, End: 11}, + size: 5, + want: []Restriction{{0, 5}, {5, 10}, {10, 11}}, + }, + { Review comment: Yeah, that seems like a good idea. I'll add one more case before merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.
robertwb commented on a change in pull request #13175: URL: https://github.com/apache/beam/pull/13175#discussion_r513689796 ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -368,82 +383,129 @@ class PerKey(PTransform): weighted: (optional) if set to True, the transform returns weighted quantiles. The input PCollection is then expected to contain tuples of input values with the corresponding weight. + batch_input: (optional) if set to True, the transform expects each element +of input PCollection to be a batch. Provides a way to accumulate +multiple elements at a time more efficiently. """ -def __init__(self, num_quantiles, key=None, reverse=False, weighted=False): +def __init__( +self, +num_quantiles, +key=None, +reverse=False, +weighted=False, +batch_input=False): self._num_quantiles = num_quantiles self._key = key self._reverse = reverse self._weighted = weighted + self._batch_input = batch_input def expand(self, pcoll): return pcoll | CombinePerKey( ApproximateQuantilesCombineFn.create( num_quantiles=self._num_quantiles, key=self._key, reverse=self._reverse, - weighted=self._weighted)) + weighted=self._weighted, + batch_input=self._batch_input)) def display_data(self): return ApproximateQuantiles._display_data( num_quantiles=self._num_quantiles, key=self._key, reverse=self._reverse, - weighted=self._weighted) + weighted=self._weighted, + batch_input=self._batch_input) + + +class _QuantileSpec(object): + """Quantiles computation specifications.""" + def __init__(self, buffer_size, num_buffers, weighted, key, reverse): +# type: (int, int, bool, Any, bool) -> None +self.buffer_size = buffer_size +self.num_buffers = num_buffers +self.weighted = weighted +self.key = key +self.reverse = reverse + +# Used to sort tuples of values and weights. +self.weighted_key = None if key is None else (lambda x: key(x[0])) + +# Used to compare values. +if key is None and not reverse: Review comment: Nit: it'd be easier to read `if reverse and key is None` rather than having the extra negation in there. ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -61,30 +58,34 @@ K = typing.TypeVar('K') V = typing.TypeVar('V') +try: + import mmh3 # pylint: disable=import-error -def _get_default_hash_fn(): - """Returns either murmurhash or md5 based on installation.""" - try: -import mmh3 # pylint: disable=import-error + def _mmh3_hash(value): +# mmh3.hash64 returns two 64-bit unsigned integers +return mmh3.hash64(value, seed=0, signed=False)[0] + + _default_hash_fn = _mmh3_hash + _default_hash_fn_type = 'mmh3' +except ImportError: -def _mmh3_hash(value): - # mmh3.hash64 returns two 64-bit unsigned integers - return mmh3.hash64(value, seed=0, signed=False)[0] + def _md5_hash(value): +# md5 is a 128-bit hash, so we truncate the hexdigest (string of 32 +# hexadecimal digits) to 16 digits and convert to int to get the 64-bit +# integer fingerprint. +return int(hashlib.md5(value).hexdigest()[:16], 16) -return _mmh3_hash + _default_hash_fn = _md5_hash + _default_hash_fn_type = 'md5' - except ImportError: + +def _get_default_hash_fn(): + """Returns either murmurhash or md5 based on installation.""" + if _default_hash_fn_type == 'md5': logging.warning( 'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of' Review comment: Are there downsides to just making this a dependency? ## File path: sdks/python/apache_beam/transforms/stats.py ## @@ -368,82 +383,129 @@ class PerKey(PTransform): weighted: (optional) if set to True, the transform returns weighted quantiles. The input PCollection is then expected to contain tuples of input values with the corresponding weight. + batch_input: (optional) if set to True, the transform expects each element +of input PCollection to be a batch. Provides a way to accumulate +multiple elements at a time more efficiently. """ -def __init__(self, num_quantiles, key=None, reverse=False, weighted=False): +def __init__( +self, +num_quantiles, +key=None, +reverse=False, +weighted=False, +batch_input=False): self._num_quantiles = num_quantiles self._key = key self._reverse = reverse self._weighted = weighted + self._batch_input = batch_input def expand(self, pcoll): return pcoll | CombinePerKey( ApproximateQuantilesCombineFn.create( num_quantiles=self._num_quantiles,
[GitHub] [beam] robertwb commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode
robertwb commented on a change in pull request #13139: URL: https://github.com/apache/beam/pull/13139#discussion_r513682982 ## File path: sdks/python/apache_beam/dataframe/frames.py ## @@ -619,21 +619,75 @@ def assign(self, **kwargs): "instances are supported.") return frame_base._elementwise_method('assign')(self, **kwargs) - apply = frame_base.not_implemented_method('apply') - explode = frame_base.not_implemented_method('explode') isin = frame_base.not_implemented_method('isin') append = frame_base.not_implemented_method('append') combine = frame_base.not_implemented_method('combine') combine_first = frame_base.not_implemented_method('combine_first') count = frame_base.not_implemented_method('count') - drop = frame_base.not_implemented_method('drop') eval = frame_base.not_implemented_method('eval') reindex = frame_base.not_implemented_method('reindex') melt = frame_base.not_implemented_method('melt') pivot = frame_base.not_implemented_method('pivot') pivot_table = frame_base.not_implemented_method('pivot_table') + @frame_base.args_to_kwargs(pd.DataFrame) + @frame_base.populate_defaults(pd.DataFrame) + def explode(self, column, ignore_index): +# ignoring the index will not preserve it +preserves = (partitionings.Nothing() if ignore_index + else partitionings.Singleton()) +return frame_base.DeferredFrame.wrap( +expressions.ComputedExpression( +'explode', +lambda df: df.explode(column, ignore_index), +[self._expr], +preserves_partition_by=preserves, +requires_partition_by=partitionings.Nothing())) + + + @frame_base.args_to_kwargs(pd.DataFrame) + @frame_base.populate_defaults(pd.DataFrame) + @frame_base.maybe_inplace + def drop(self, **kwargs): +labels = kwargs.get('labels', None) Review comment: One danger here of using kwargs.get rather than letting it be a parameter is that you're hard-coding what all the defaults are (rather than using populate_defaults). ## File path: sdks/python/apache_beam/dataframe/frames.py ## @@ -1513,6 +1584,8 @@ def repeat(self, repeats): 'repeat', lambda series: series.str.repeat(repeats), [self._expr], + # Output will also be a str Series Review comment: Is there a drawback to this being automatically inferred? (Or was it not?) Same below. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #13105: [BEAM-10940] Add sdf initiated checkpoint support to portable Flink.
boyuanzz commented on pull request #13105: URL: https://github.com/apache/beam/pull/13105#issuecomment-718139268 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12960: [BEAM-9804] Allow user configuration of BigQuery temporary dataset
pabloem commented on pull request #12960: URL: https://github.com/apache/beam/pull/12960#issuecomment-718135801 Run Python_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #13208: [BEAM-10703] Add an option to GroupIntoBatches to output ShardedKeys. Update Dataflow pipeline translation accordingly.
robertwb commented on a change in pull request #13208: URL: https://github.com/apache/beam/pull/13208#discussion_r513671483 ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java ## @@ -103,43 +156,76 @@ public void process(ProcessContext c) { } @Override -public PTransformReplacement>, PCollection>>> +public PTransformReplacement>, PCollection, Iterable>>> getReplacementTransform( AppliedPTransform< -PCollection>, PCollection>>, GroupIntoBatches> +PCollection>, +PCollection, Iterable>>, +GroupIntoBatches.WithShardedKey> transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new StreamingGroupIntoBatches(runner, transform.getTransform())); + new StreamingGroupIntoBatchesWithShardedKey<>(runner, transform.getTransform())); } @Override public Map, ReplacementOutput> mapOutputs( -Map, PCollection> outputs, PCollection>> newOutput) { +Map, PCollection> outputs, +PCollection, Iterable>> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } } /** - * Specialized implementation of {@link GroupIntoBatches} for unbounded Dataflow pipelines. The - * override does the same thing as the original transform but additionally record the input to add - * corresponding properties during the graph translation. + * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for unbounded Dataflow + * pipelines. The override does the same thing as the original transform but additionally records + * the input of {@code GroupIntoBatchesDoFn} in order to append relevant step properties during + * the graph translation. */ - static class StreamingGroupIntoBatches - extends PTransform>, PCollection>>> { + static class StreamingGroupIntoBatchesWithShardedKey + extends PTransform>, PCollection, Iterable>>> { private final transient DataflowRunner runner; -private final GroupIntoBatches original; +private final GroupIntoBatches.WithShardedKey original; -public StreamingGroupIntoBatches(DataflowRunner runner, GroupIntoBatches original) { +public StreamingGroupIntoBatchesWithShardedKey( +DataflowRunner runner, GroupIntoBatches.WithShardedKey original) { this.runner = runner; this.original = original; } @Override -public PCollection>> expand(PCollection> input) { - runner.maybeRecordPCollectionWithAutoSharding(input); - return input.apply(original); +public PCollection, Iterable>> expand(PCollection> input) { + PCollection, V>> intermediate_input = ShardKeys(input); + + runner.maybeRecordPCollectionWithAutoSharding(intermediate_input); + + if (original.getMaxBufferingDuration() != null) { Review comment: This doesn't look like it'll scale if more options are used. Why not just apply original? ## File path: runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java ## @@ -103,43 +156,76 @@ public void process(ProcessContext c) { } @Override -public PTransformReplacement>, PCollection>>> +public PTransformReplacement>, PCollection, Iterable>>> getReplacementTransform( AppliedPTransform< -PCollection>, PCollection>>, GroupIntoBatches> +PCollection>, +PCollection, Iterable>>, +GroupIntoBatches.WithShardedKey> transform) { return PTransformReplacement.of( PTransformReplacements.getSingletonMainInput(transform), - new StreamingGroupIntoBatches(runner, transform.getTransform())); + new StreamingGroupIntoBatchesWithShardedKey<>(runner, transform.getTransform())); } @Override public Map, ReplacementOutput> mapOutputs( -Map, PCollection> outputs, PCollection>> newOutput) { +Map, PCollection> outputs, +PCollection, Iterable>> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } } /** - * Specialized implementation of {@link GroupIntoBatches} for unbounded Dataflow pipelines. The - * override does the same thing as the original transform but additionally record the input to add - * corresponding properties during the graph translation. + * Specialized implementation of {@link GroupIntoBatches.WithShardedKey} for unbounded Dataflow + * pipelines. The override does the same thing as the original transform but additionally records + * the input of {@code GroupIntoBatchesDoFn} in order to append relevant step properties during + * the graph translation. */ - static
[GitHub] [beam] tysonjh commented on pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.
tysonjh commented on pull request #13213: URL: https://github.com/apache/beam/pull/13213#issuecomment-718134478 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on a change in pull request #13188: [BEAM-11108] Add a version of TextIO implemented via SDF.
lostluck commented on a change in pull request #13188: URL: https://github.com/apache/beam/pull/13188#discussion_r513678887 ## File path: sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go ## @@ -63,10 +63,87 @@ func TestRestriction_EvenSplits(t *testing.T) { t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v or %v", split.Start, split.End, size, min, min+1) } - // Check: All elements are still in a split restrictions. This - // logic assumes that the splits are returned in order which - // isn't guaranteed by EvenSplits, but this check is way easier - // with the assumption. + // Check: All elements are still in a split restriction and + // the restrictions are in the appropriate ascending order. + if split.Start != prevEnd { + t.Errorf("restriction range [%v, %v] missing after splits.", + prevEnd, split.Start) + } else { + prevEnd = split.End + } + } + if prevEnd != r.End { + t.Errorf("restriction range [%v, %v] missing after splits.", + prevEnd, r.End) + } + }) + } +} + +// TestRestriction_SizedSplits tests various splits and checks that they all +// follow the contract for SizedSplits. This means that all restrictions match +// the given size unless it is a remainder, and that each element is present +// in the split restrictions. +func TestRestriction_SizedSplits(t *testing.T) { + tests := []struct { + rest Restriction + size int64 + want []Restriction + }{ + { + rest: Restriction{Start: 0, End: 11}, + size: 5, + want: []Restriction{{0, 5}, {5, 10}, {10, 11}}, + }, + { Review comment: Consider adding an exact case too eg. Start 7, End 17 size 5 {7, 12}, {12, 17} IIUC the implementation correctly. If I understand the This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #13134: [BEAM-10402] Enable checkerframework globally
kennknowles commented on pull request #13134: URL: https://github.com/apache/beam/pull/13134#issuecomment-718132606 Run Portable_Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh opened a new pull request #13213: [INFRA-20858] Update JDK name to match Jenkins.
tysonjh opened a new pull request #13213: URL: https://github.com/apache/beam/pull/13213 See: https://issues.apache.org/jira/browse/INFRA-20858 https://lists.apache.org/thread.html/rb4c2834b9874b9f4a74c528de9055958483d2bc6e62c3464bc5c053f%40%3Cbuilds.apache.org%3E **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2 --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | --- Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](htt ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Statu
[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn
pabloem commented on pull request #13154: URL: https://github.com/apache/beam/pull/13154#issuecomment-718126946 Run Python 3.8 PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #13154: Implementing Python Bounded Source Reader DoFn
pabloem commented on pull request #13154: URL: https://github.com/apache/beam/pull/13154#issuecomment-718126843 > I'm curious do we have a plan to build actual SDF for BQ instead of still relying on BoundedSource implementation? In this case, we will have a simple DoFn that _starts_ the read from BQ, but it eventually returns multiple Avro file sources that can be read individually. This is different from what we had before, where all of the BQ reading logic was part of a BoundedSource. In fact, the _CustomBigQuerySource will be removed eventually. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on a change in pull request #13154: Implementing Python Bounded Source Reader DoFn
pabloem commented on a change in pull request #13154: URL: https://github.com/apache/beam/pull/13154#discussion_r513670976 ## File path: sdks/python/apache_beam/io/iobase.py ## @@ -1618,3 +1628,48 @@ def display_data(self): 'source': DisplayDataItem(self.source.__class__, label='Read Source'), 'source_dd': self.source } + + +class SDFBoundedSourceReader(PTransform): Review comment: I've done this - but I've still allowed the source to come in via the constructor as well as as an input. The intention of doing this is to keep the display data for simple Read transforms where the source is known at construction time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] yifanmai commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK
yifanmai commented on a change in pull request #13048: URL: https://github.com/apache/beam/pull/13048#discussion_r513623504 ## File path: CHANGES.md ## @@ -62,6 +62,7 @@ ## New Features / Improvements * Added support for avro payload format in Beam SQL Kafka Table ([BEAM-10885](https://issues.apache.org/jira/browse/BEAM-10885)) +* Added CombineFn.setup and CombineFn.teardown to Python SDK. These methods let you initialize a state before any of the other methods of the CombineFn is executed and clean that state up later on. ([BEAM-3736](https://issues.apache.org/jira/browse/BEAM-3736)) Review comment: nit: 'a state' -> 'the CombineFn's state' ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -1975,10 +1990,14 @@ def add_input_types(transform): return combined if self.has_defaults: - combine_fn = ( - self.fn if isinstance(self.fn, CombineFn) else - CombineFn.from_callable(self.fn)) - default_value = combine_fn.apply([], *self.args, **self.kwargs) + combine_fn = copy.deepcopy( + self.fn if isinstance(self.fn, CombineFn) else CombineFn. Review comment: nit: this can be `copy.deepcopy(self.fn) if...` i.e. copy is only needed in the first branch ## File path: sdks/python/apache_beam/transforms/core.py ## @@ -877,17 +877,19 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn): combining process proceeds as follows: 1. Input values are partitioned into one or more batches. - 2. For each batch, the create_accumulator method is invoked to create a fresh + 2. For each batch, the setup method is invoked. + 3. For each batch, the create_accumulator method is invoked to create a fresh initial "accumulator" value representing the combination of zero values. - 3. For each input value in the batch, the add_input method is invoked to + 4. For each input value in the batch, the add_input method is invoked to combine more values with the accumulator for that batch. - 4. The merge_accumulators method is invoked to combine accumulators from + 5. The merge_accumulators method is invoked to combine accumulators from separate batches into a single combined output accumulator value, once all of the accumulators have had all the input value in their batches added to them. This operation is invoked repeatedly, until there is only one accumulator value left. - 5. The extract_output operation is invoked on the final accumulator to get + 6. The extract_output operation is invoked on the final accumulator to get the output value. + 7. The teardown method is invoked. Review comment: Question: What is the expected behavior if setup throws an exception? Should teardown still be called? ## File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ## @@ -411,6 +411,33 @@ def visit_transform(self, transform_node): return FlattenInputVisitor() + @staticmethod + def combinefn_visitor(): +# Imported here to avoid circular dependencies. +from apache_beam.pipeline import PipelineVisitor +from apache_beam import core + +class CombineFnVisitor(PipelineVisitor): + """Checks if `CombineFn` has non-default setup or teardown methods. + If yes, raises `ValueError`. + """ + def visit_transform(self, applied_transform): +transform = applied_transform.transform +if isinstance(transform, core.ParDo) and isinstance( +transform.fn, core.CombineValuesDoFn): + if self._overrides_setup_or_teardown(transform.fn.combinefn): +raise ValueError( +'CombineFn.setup and CombineFn.teardown are ' +'not supported with non-portable Dataflow ' +'runner. Please use Dataflow Runner V2 instead.') Review comment: Question: Is there any plan to support this in non-portable Dataflow Runner, or will this be a V2 feature only? ## File path: sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py ## @@ -0,0 +1,131 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +from typing import Set
[GitHub] [beam] robertwb commented on a change in pull request #13141: [BEAM-9547] Dataframe corrwith.
robertwb commented on a change in pull request #13141: URL: https://github.com/apache/beam/pull/13141#discussion_r513666115 ## File path: sdks/python/apache_beam/dataframe/frames.py ## @@ -575,6 +575,8 @@ def __setitem__(self, key, value): else: raise NotImplementedError(key) + align = frame_base._elementwise_method('align') Review comment: Good call. Done. ## File path: sdks/python/apache_beam/dataframe/frames.py ## @@ -771,6 +773,62 @@ def fill_matrix(*args): requires_partition_by=partitionings.Singleton(), proxy=proxy)) + @frame_base.args_to_kwargs(pd.DataFrame) + @frame_base.populate_defaults(pd.DataFrame) + def corrwith(self, other, axis, **kwargs): +if axis not in (0, 'index'): + raise NotImplementedError('corrwith(axis=%r)' % axis) +if not isinstance(other, frame_base.DeferredFrame): + other = frame_base.DeferredFrame.wrap( + expressions.ConstantExpression(other)) + +if isinstance(other, DeferredSeries): + proxy = self._expr.proxy().corrwith(other._expr.proxy()) + self, other = self.align(other, axis=0, join='inner') + corrs = [self[col].corr(other, **kwargs) for col in proxy.index] + def fill_dataframe(*args): +result = proxy.copy(deep=True) +for col, value in zip(proxy.index, args): + result[col] = value +return result + with expressions.allow_non_parallel_operations(True): +return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( +'fill_dataframe', +fill_dataframe, +[corr._expr for corr in corrs], +requires_partition_by=partitionings.Singleton(), +proxy=proxy)) + +elif isinstance(other, DeferredDataFrame): + proxy = self._expr.proxy().corrwith(other._expr.proxy()) + self, other = self.align(other, axis=0, join='inner') + valid_cols = list( + set(self.columns) + .intersection(other.columns) + .intersection(proxy.index)) + corrs = [self[col].corr(other[col], **kwargs) for col in valid_cols] + def fill_dataframe(*args): +result = proxy.copy(deep=True) +for col, value in zip(valid_cols, args): + result[col] = value +return result + with expressions.allow_non_parallel_operations(True): +return frame_base.DeferredFrame.wrap( + expressions.ComputedExpression( +'fill_dataframe', +fill_dataframe, +[corr._expr for corr in corrs], +requires_partition_by=partitionings.Singleton(), +proxy=proxy)) Review comment: I've consolidated them now. ## File path: sdks/python/apache_beam/dataframe/frames.py ## @@ -771,6 +773,62 @@ def fill_matrix(*args): requires_partition_by=partitionings.Singleton(), proxy=proxy)) + @frame_base.args_to_kwargs(pd.DataFrame) + @frame_base.populate_defaults(pd.DataFrame) + def corrwith(self, other, axis, **kwargs): +if axis not in (0, 'index'): + raise NotImplementedError('corrwith(axis=%r)' % axis) +if not isinstance(other, frame_base.DeferredFrame): + other = frame_base.DeferredFrame.wrap( + expressions.ConstantExpression(other)) + +if isinstance(other, DeferredSeries): + proxy = self._expr.proxy().corrwith(other._expr.proxy()) + self, other = self.align(other, axis=0, join='inner') + corrs = [self[col].corr(other, **kwargs) for col in proxy.index] Review comment: Resolved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #12806: [BEAM-10869] Make WriteToPubsub output serialized PubsubMessage proto bytes when using runner v2
boyuanzz commented on pull request #12806: URL: https://github.com/apache/beam/pull/12806#issuecomment-718116388 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #12960: [BEAM-9804] Allow user configuration of BigQuery temporary dataset
pabloem commented on pull request #12960: URL: https://github.com/apache/beam/pull/12960#issuecomment-718112424 Run Python_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lostluck commented on pull request #13209: [BEAM-9615] Add schema coders and tests.
lostluck commented on pull request #13209: URL: https://github.com/apache/beam/pull/13209#issuecomment-718112331 Run Go PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider
TheNeuralBit commented on a change in pull request #12780: URL: https://github.com/apache/beam/pull/12780#discussion_r513654019 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubAvroIT.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.hamcrest.Matcher; +import org.joda.time.Instant; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for querying Pubsub AVRO messages with SQL. */ +@RunWith(JUnit4.class) +public class PubsubAvroIT extends PubsubTableProviderIT { + private static final Schema NAME_HEIGHT_KNOWS_JS_SCHEMA = + Schema.builder() + .addNullableField("name", Schema.FieldType.STRING) + .addNullableField("height", Schema.FieldType.INT32) + .addNullableField("knowsJavascript", Schema.FieldType.BOOLEAN) + .build(); + + private static final Schema NAME_HEIGHT_SCHEMA = + Schema.builder() + .addNullableField("name", Schema.FieldType.STRING) + .addNullableField("height", Schema.FieldType.INT32) + .build(); + + @Override + protected String getPayloadFormat() { +return "avro"; + } + + @Override + protected PCollection applyRowsToStrings(PCollection rows) { +return rows.apply( +MapElements.into(TypeDescriptors.strings()) +.via( +(Row row) -> +new String( + AvroUtils.getRowToAvroBytesFunction(row.getSchema()).apply(row), UTF_8))); + } + + @Override + protected PubsubMessage messageIdName(Instant timestamp, int id, String name) { +Row row = row(PAYLOAD_SCHEMA, id, name); +return message(timestamp, AvroUtils.getRowToAvroBytesFunction(PAYLOAD_SCHEMA).apply(row)); + } + + @Override + protected Matcher matcherNames(String name) { +Schema schema = Schema.builder().addStringField("name").build(); +Row row = row(schema, name); +return hasProperty("payload", equalTo(AvroUtils.getRowToAvroBytesFunction(schema).apply(row))); + } + + @Override + protected Matcher matcherNameHeight(String name, int height) { +Row row = row(NAME_HEIGHT_SCHEMA, name, height); +return hasProperty( +"payload", equalTo(AvroUtils.getRowToAvroBytesFunction(NAME_HEIGHT_SCHEMA).apply(row))); + } + + @Override + protected Matcher matcherNameHeightKnowsJS( + String name, int height, boolean knowsJS) { +Row row = row(NAME_HEIGHT_KNOWS_JS_SCHEMA, name, height, knowsJS); +return hasProperty( +"payload", + equalTo(AvroUtils.getRowToAvroBytesFunction(NAME_HEIGHT_KNOWS_JS_SCHEMA).apply(row))); Review comment: We should do this for the other `matcher*` methods as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #13144: [BEAM-10475] Add max buffering duration option for GroupIntoBatches transform in Python
boyuanzz commented on pull request #13144: URL: https://github.com/apache/beam/pull/13144#issuecomment-718106117 Per https://issues.apache.org/jira/browse/BEAM-10921, we can ignore the windows-latest test suite. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #13144: [BEAM-10475] Add max buffering duration option for GroupIntoBatches transform in Python
boyuanzz commented on pull request #13144: URL: https://github.com/apache/beam/pull/13144#issuecomment-718092303 Run Python_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit merged pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider
TheNeuralBit merged pull request #12839: URL: https://github.com/apache/beam/pull/12839 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb merged pull request #13077: [BEAM-9561] Dataframe test infrastructure improvements.
robertwb merged pull request #13077: URL: https://github.com/apache/beam/pull/13077 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] yifanmai commented on pull request #13202: WIP: Set parent of fused stages to the lowest common ancestor
yifanmai commented on pull request #13202: URL: https://github.com/apache/beam/pull/13202#issuecomment-718087661 Flink is still unhappy with this change. I will investigate further. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] dhruvesh09 closed pull request #13171: Added check for alter_label_if_ipython when input PCollection is an empty tuple or dictionary.
dhruvesh09 closed pull request #13171: URL: https://github.com/apache/beam/pull/13171 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.
iindyk commented on pull request #13175: URL: https://github.com/apache/beam/pull/13175#issuecomment-717978836 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tszerszen edited a comment on pull request #13142: [BEAM-5939] - Deduplicate constants
tszerszen edited a comment on pull request #13142: URL: https://github.com/apache/beam/pull/13142#issuecomment-717849131 @tvalentyn thank you for your review, could you please take a look after recent changes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tszerszen edited a comment on pull request #13142: [BEAM-5939] - Deduplicate constants
tszerszen edited a comment on pull request #13142: URL: https://github.com/apache/beam/pull/13142#issuecomment-717849131 @tvalentyn could you please take a look after recent changes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tszerszen commented on pull request #13142: [BEAM-5939] - Deduplicate constants
tszerszen commented on pull request #13142: URL: https://github.com/apache/beam/pull/13142#issuecomment-717849131 @tvalentyn could you please take a look now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski removed a comment on pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider
piotr-szuberski removed a comment on pull request #12780: URL: https://github.com/apache/beam/pull/12780#issuecomment-717797979 Run SQL PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider
piotr-szuberski commented on pull request #12780: URL: https://github.com/apache/beam/pull/12780#issuecomment-717822443 Run SQL PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider
piotr-szuberski commented on pull request #12780: URL: https://github.com/apache/beam/pull/12780#issuecomment-717797979 Run SQL PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on a change in pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider
piotr-szuberski commented on a change in pull request #12780: URL: https://github.com/apache/beam/pull/12780#discussion_r513281422 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubAvroIT.java ## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; + +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.AvroUtils; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.hamcrest.Matcher; +import org.joda.time.Instant; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration tests for querying Pubsub AVRO messages with SQL. */ +@RunWith(JUnit4.class) +public class PubsubAvroIT extends PubsubTableProviderIT { + private static final Schema NAME_HEIGHT_KNOWS_JS_SCHEMA = + Schema.builder() + .addNullableField("name", Schema.FieldType.STRING) + .addNullableField("height", Schema.FieldType.INT32) + .addNullableField("knowsJavascript", Schema.FieldType.BOOLEAN) + .build(); + + private static final Schema NAME_HEIGHT_SCHEMA = + Schema.builder() + .addNullableField("name", Schema.FieldType.STRING) + .addNullableField("height", Schema.FieldType.INT32) + .build(); + + @Override + protected String getPayloadFormat() { +return "avro"; + } + + @Override + protected PCollection applyRowsToStrings(PCollection rows) { +return rows.apply( +MapElements.into(TypeDescriptors.strings()) +.via( +(Row row) -> +new String( + AvroUtils.getRowToAvroBytesFunction(row.getSchema()).apply(row), UTF_8))); + } + + @Override + protected PubsubMessage messageIdName(Instant timestamp, int id, String name) { +Row row = row(PAYLOAD_SCHEMA, id, name); +return message(timestamp, AvroUtils.getRowToAvroBytesFunction(PAYLOAD_SCHEMA).apply(row)); + } + + @Override + protected Matcher matcherNames(String name) { +Schema schema = Schema.builder().addStringField("name").build(); +Row row = row(schema, name); +return hasProperty("payload", equalTo(AvroUtils.getRowToAvroBytesFunction(schema).apply(row))); + } + + @Override + protected Matcher matcherNameHeight(String name, int height) { +Row row = row(NAME_HEIGHT_SCHEMA, name, height); +return hasProperty( +"payload", equalTo(AvroUtils.getRowToAvroBytesFunction(NAME_HEIGHT_SCHEMA).apply(row))); + } + + @Override + protected Matcher matcherNameHeightKnowsJS( + String name, int height, boolean knowsJS) { +Row row = row(NAME_HEIGHT_KNOWS_JS_SCHEMA, name, height, knowsJS); +return hasProperty( +"payload", + equalTo(AvroUtils.getRowToAvroBytesFunction(NAME_HEIGHT_KNOWS_JS_SCHEMA).apply(row))); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider
piotr-szuberski commented on pull request #12839: URL: https://github.com/apache/beam/pull/12839#issuecomment-717789503 Run SQL PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski removed a comment on pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider
piotr-szuberski removed a comment on pull request #12839: URL: https://github.com/apache/beam/pull/12839#issuecomment-717748971 Run SQL PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on a change in pull request #12780: [BEAM-5504] Add Avro support to Pubsub table provider
piotr-szuberski commented on a change in pull request #12780: URL: https://github.com/apache/beam/pull/12780#discussion_r513235208 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow.java ## @@ -226,11 +260,11 @@ public void processElement( field, timestamp, element.getAttributeMap(), payload)) .collect(toList()); o.get(MAIN_TAG).output(Row.withSchema(messageSchema).addValues(values).build()); - } catch (UnsupportedRowJsonException jsonException) { + } catch (UnsupportedRowJsonException | AvroRuntimeException exception) { Review comment: Done ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PayloadFormat.java ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +public enum PayloadFormat { + JSON, + AVRO +} Review comment: Done. ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageToRow.java ## @@ -226,11 +260,11 @@ public void processElement( field, timestamp, element.getAttributeMap(), payload)) .collect(toList()); o.get(MAIN_TAG).output(Row.withSchema(messageSchema).addValues(values).build()); - } catch (UnsupportedRowJsonException jsonException) { + } catch (UnsupportedRowJsonException | AvroRuntimeException exception) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider
piotr-szuberski commented on pull request #12839: URL: https://github.com/apache/beam/pull/12839#issuecomment-717748922 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on a change in pull request #12839: [BEAM-10893] Add Json support to Kafka Table Provider
piotr-szuberski commented on a change in pull request #12839: URL: https://github.com/apache/beam/pull/12839#discussion_r513222917 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaJsonTable.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.kafka; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.RowJson; +import org.apache.beam.sdk.util.RowJsonUtils; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; + +public class BeamKafkaJsonTable extends BeamKafkaTable { + public BeamKafkaJsonTable(Schema beamSchema, String bootstrapServers, List topics) { +super(beamSchema, bootstrapServers, topics); + } + + @Override + public PTransform>, PCollection> getPTransformForInput() { +ObjectMapper objectMapper = + RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(schema)); +return new BeamKafkaJsonTable.JsonRecorderDecoder(schema, objectMapper); + } + + @Override + public PTransform, PCollection>> getPTransformForOutput() { +ObjectMapper objectMapper = + RowJsonUtils.newObjectMapperWith(RowJson.RowJsonSerializer.forSchema(schema)); +return new BeamKafkaJsonTable.JsonRecorderEncoder(objectMapper); + } + + /** A PTransform to convert {@code KV} to {@link Row}. */ + public static class JsonRecorderDecoder Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org