Re: [PR] [flink] #31390 emit watermark with empty source [beam]
je-ik commented on PR #31391: URL: https://github.com/apache/beam/pull/31391#issuecomment-2130976269 > * This sounds similar to [[runners-flink] Fix watermark emission for empty splits (#29816) #30969](https://github.com/apache/beam/pull/30969), what is the difference here ? The fix in #30969 was related, but different. Source can be empty _temporarily_ or _finally_. The fact, that the source is empty for ever is signaled by watermark going to infinity. Then the split can be closed (and this results in watermark move, because closed split does not hold watermark anymore). This PR fixes the other case - when the source is not emitting any data, but _does not_ move watermark to infinity, but rather uses some idle source policy. Before this PR no watermark was emitted downstream _until at least one element was emitted from the source_. This is fixed now. > > * I also observed similar issue on JmsIO on Dataflow runner ("watermark does not increase when there is no incoming data for a while") and the fix [[DRAFT] Attempt fix Jms watermark #30337](https://github.com/apache/beam/pull/30337) didn't work. I am wondering if [[Bug]: FlinkRunner does not emit watermark with empty source #31390](https://github.com/apache/beam/issues/31390) is generic at SDK level and a fix could posed in general ? All these fixes relate to Flink only. These issues were introduced by source refactoring in FlinkRunner, so nothing that can be extended to a general case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1614313097 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Optional; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor; +import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor; +import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext; +import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; +import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used to process {@link Work} by executing user DoFns for a specific computation. May be reused to + * process future work items owned a computation. + * + * Should only be accessed by 1 thread at a time. + * + * @implNote Once closed, it cannot be reused. + */ +// TODO(m-trieu): See if this can be combined/cleaned up with StreamingModeExecutionContext as the +// seperation of responsibilities are unclear. +@AutoValue +@Internal +@NotThreadSafe +public abstract class ComputationWorkExecutor { Review Comment: @scwhittle this class is ExecutionState.java just renamed accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1614304621 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.processing; + +import java.time.Duration; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class StreamingCommitFinalizer { + private static final Logger LOG = LoggerFactory.getLogger(StreamingCommitFinalizer.class); + private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = Duration.ofMinutes(5L); + private final Cache onCommitFinalizedCache; + private final BoundedQueueExecutor workExecutor; + + private StreamingCommitFinalizer( + Cache onCommitFinalizedCache, BoundedQueueExecutor workExecutor) { +this.onCommitFinalizedCache = onCommitFinalizedCache; +this.workExecutor = workExecutor; + } + + static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) { +return new StreamingCommitFinalizer( + CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(), +workExecutor); + } + + /** + * Stores a map of user worker generated id's and callbacks to execute once a commit has been + * successfully committed to the backing state store. + */ + void cacheCommitFinalizers(Map commitCallbacks) { +onCommitFinalizedCache.putAll(commitCallbacks); + } + + /** + * Calls callbacks for WorkItem to mark that commit has been persisted (finalized) to the backing + * state store and to checkpoint the source. + */ + void finalizeCommits(Windmill.WorkItem work) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1614304419 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java: ## @@ -193,31 +180,33 @@ public void start( for (StepContext stepContext : stepContexts) { stepContext.start( stateReader, -inputDataWatermark, +work.watermarks().inputDataWatermark(), Review Comment: was able to update all of these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1614297131 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java: ## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.processing; + +import com.google.api.services.dataflow.model.MapTask; +import java.util.Collection; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Supplier; +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; +import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor; +import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory; +import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor; +import org.apache.beam.runners.dataflow.worker.HotKeyLogger; +import org.apache.beam.runners.dataflow.worker.ReaderCache; +import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; +import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; +import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; +import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState; +import org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeException; +import org.apache.beam.runners.dataflow.worker.streaming.StageInfo; +import org.apache.beam.runners.dataflow.worker.streaming.Work; +import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters; +import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; +import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; +import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader; +import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker; +import org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.WorkFailureProcessor; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.fn.IdGenerator; +import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Schedules execution of user code to process a {@link + * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem} then commits the work item + * back to streaming execution backend. + */ +@Internal +@ThreadSafe +public final class StreamingWorkScheduler { + private static final Logger LOG = LoggerFactory.getLogger(StreamingWorkScheduler.class); + + private final DataflowWorkerHarnessOptions options; + private final Supplier clock; + private final ExecutionStateFactory executionStateFactory; + private final SideInputStateFetcher sideInputStateFetcher; + private final FailureTracker failureTracker; + private final WorkFailureProcessor workFailureProcessor; + private final StreamingCommitFinalizer commitFinalizer; + private final StreamingCounters streamingCounters; + private final HotKeyLogger hotKeyLogger; + private final ConcurrentMap stageInfoMap; + private final DataflowExecutionStateSampler sampler; + private final AtomicInteger maxWorkItemCommitBytes; + + publi
Re: [PR] Fix an incompatibility with hamcrest 2.2 [beam]
Abacn merged PR #31395: URL: https://github.com/apache/beam/pull/31395 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Parse YAML ExpansionService configs directly using SnakeYAML [beam]
chamikaramj opened a new pull request, #31406: URL: https://github.com/apache/beam/pull/31406 This fixes https://github.com/apache/beam/issues/31405. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] enable BigQueryIO read throttling detection for Python SDK [beam]
github-actions[bot] commented on PR #31404: URL: https://github.com/apache/beam/pull/31404#issuecomment-2130548849 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @shunping for label python. R: @shunping for label io. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add try-excepts around data sampler encoding [beam]
github-actions[bot] commented on PR #31396: URL: https://github.com/apache/beam/pull/31396#issuecomment-2130522811 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add try-excepts around data sampler encoding [beam]
rohdesamuel commented on PR #31396: URL: https://github.com/apache/beam/pull/31396#issuecomment-2130522353 R: @KevinGG -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] Add in redistribute option for Kafka Read [beam]
codecov[bot] commented on PR #31347: URL: https://github.com/apache/beam/pull/31347#issuecomment-2130487239 ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/31347?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `0%` with `1 lines` in your changes are missing coverage. Please review. > Project coverage is 71.41%. Comparing base [(`092f769`)](https://app.codecov.io/gh/apache/beam/commit/092f769a4b48740c4e6ab6e1ddc2a060986a645e?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`4ada08d`)](https://app.codecov.io/gh/apache/beam/commit/4ada08d2903e34e641452d8f6cc8f7ac45023bfa?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 39 commits behind head on master. | [Files](https://app.codecov.io/gh/apache/beam/pull/31347?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [sdks/python/apache\_beam/io/kafka.py](https://app.codecov.io/gh/apache/beam/pull/31347?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Fio%2Fkafka.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8va2Fma2EucHk=) | 0.00% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/beam/pull/31347?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #31347 +/- ## - Coverage 71.44% 71.41% -0.04% Complexity 1474 1474 Files 909 910 +1 Lines113652 113845 +193 Branches 1076 1076 + Hits 8120081299 +99 - Misses3043030526 +96 + Partials 2022 2020 -2 ``` | [Flag](https://app.codecov.io/gh/apache/beam/pull/31347/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [python](https://app.codecov.io/gh/apache/beam/pull/31347/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `81.37% <0.00%> (-0.09%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/31347?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] enable BigQueryIO read throttling detection for Python SDK [beam]
Abacn commented on code in PR #31404: URL: https://github.com/apache/beam/pull/31404#discussion_r1614114675 ## sdks/python/apache_beam/io/gcp/bigquery.py: ## @@ -1186,11 +1186,19 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None): parent=parent, read_session=requested_session, max_stream_count=stream_count) + if self.use_native_datetime: +display_schema = "Arrow Schema:" + str(read_session.arrow_schema) + else: +display_schema = "Avro Schema:" + str(read_session.avro_schema) _LOGGER.info( Review Comment: this is to make this log readable. Before it was printing raw proto message truncated on streams URLs. Now it only prints the number of streams returned and the key fields. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] retry test_big_query_write_temp_table_append_schema_update up to 3 times [beam]
liferoad commented on PR #31364: URL: https://github.com/apache/beam/pull/31364#issuecomment-2130482486 > > @liferoad could you please add details to the PR description about the nature of flakiness in case someone looks up this PR to understand why we aded it? Thanks! > > More details are in the closed PR, which is in the description, :) updated the descriptions with more details now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] enable BigQueryIO read throttling detection for Python SDK [beam]
Abacn opened a new pull request, #31404: URL: https://github.com/apache/beam/pull/31404 Before (using a gcp project of limited quota): https://github.com/apache/beam/assets/8010435/ba8bf7fa-8fbe-4c4e-bd44-a41f5c0e7764";> upscale to 270+ worker aggressively, before cancel the job After (note: backend change not yet rolled out on prod): (up-hill part, max worker=181) https://github.com/apache/beam/assets/8010435/ef11b1b8-954a-416a-be6c-8b129d9584d2";> (down-hill part) https://github.com/apache/beam/assets/8010435/be766d98-5e54-4643-b2c3-59677415e6bb";> note: autoscaler won't kill active worker until current work item finish, and there is a 6 min lapse from worker started to reactive downscale working, see https://github.com/apache/beam/pull/31253#issuecomment-2118608486 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] retry test_big_query_write_temp_table_append_schema_update up to 3 times [beam]
liferoad commented on PR #31364: URL: https://github.com/apache/beam/pull/31364#issuecomment-2130465877 > @liferoad could you please add details to the PR description about the nature of flakiness in case someone looks up this PR to understand why we aded it? Thanks! More details are in the closed PR, which is in the description, :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ManagedIO] pass underlying transform URN as an annotation [beam]
github-actions[bot] commented on PR #31398: URL: https://github.com/apache/beam/pull/31398#issuecomment-2130455775 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] retry test_big_query_write_temp_table_append_schema_update up to 3 times [beam]
tvalentyn commented on PR #31364: URL: https://github.com/apache/beam/pull/31364#issuecomment-2130455697 @liferoad could you please add details to the PR description about the nature of flakiness in case someone looks up this PR to understand why we aded it? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] added pytest.mark.flaky for test_big_query_write_temp_table_append_sc… [beam]
tvalentyn merged PR #31364: URL: https://github.com/apache/beam/pull/31364 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Task] Publish Prism binary (os + architecture combos) artifacts on Beam Release. [beam]
lostluck commented on issue #29697: URL: https://github.com/apache/beam/issues/29697#issuecomment-2130446494 Per that linked PR, we ended up just building an action for it ourselves. Ultimately, we have the zipped files uploaded, along with signatures and hashes, so it fully matches what we upload to the Apache SVN repository. The binary zips we're putting up in GitHub release artifacts will be at URLs with the following pattern. http://github.com/apache/beam/releases/download/RELEASE/apache_beam-RELEASE-prism-OS-ARCH.zip Note: Beam versions have a `v` prefix, eg `v2.56.0` which needs to match in the URL. To actually finish #28187 we need to have Java (#31402) and Python (#31403) download the binaries from GitHub. For that just using whatever native library that can make the http request and handle the various redirects GitHub puts us through. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: Flink 1.17 cannot be selected on the apache-beam package v2.56.0 [beam]
jaehyeon-kim closed issue #31378: [Bug]: Flink 1.17 cannot be selected on the apache-beam package v2.56.0 URL: https://github.com/apache/beam/issues/31378 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ManagedIO] pass underlying transform URN as an annotation [beam]
chamikaramj commented on code in PR #31398: URL: https://github.com/apache/beam/pull/31398#discussion_r1614065093 ## sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java: ## @@ -522,11 +521,24 @@ public RunnerApi.PTransform translate( } if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { + ExternalTransforms.SchemaTransformPayload payload = + ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()); + String identifier = payload.getIdentifier(); transformBuilder.putAnnotations( - SCHEMATRANSFORM_URN_KEY, - ByteString.copyFromUtf8( - ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()) - .getIdentifier())); + BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY), + ByteString.copyFromUtf8(identifier)); + if (identifier.equals(MANAGED_TRANSFORM_URN)) { +Schema configSchema = + SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); +Row configRow = + RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput()); +String underlyingIdentifier = +MoreObjects.firstNonNull( +configRow.getString("transform_identifier"), "unknown_identifier"); +transformBuilder.putAnnotations( Review Comment: Is there a valid case where "transform_identifier" would not be set ? If not we should just error out. ## model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto: ## @@ -111,6 +111,15 @@ message BuilderMethod { bytes payload = 3; } +message Annotations { + enum Enum { +CONFIG_ROW_KEY = 0 [(org.apache.beam.model.pipeline.v1.beam_constant) = "config_row"]; Review Comment: Let's add short descriptions regarding each of these. ## sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java: ## @@ -522,11 +521,24 @@ public RunnerApi.PTransform translate( } if (spec.getUrn().equals(BeamUrns.getUrn(SCHEMA_TRANSFORM))) { + ExternalTransforms.SchemaTransformPayload payload = + ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()); + String identifier = payload.getIdentifier(); transformBuilder.putAnnotations( - SCHEMATRANSFORM_URN_KEY, - ByteString.copyFromUtf8( - ExternalTransforms.SchemaTransformPayload.parseFrom(spec.getPayload()) - .getIdentifier())); + BeamUrns.getConstant(Annotations.Enum.SCHEMATRANSFORM_URN_KEY), + ByteString.copyFromUtf8(identifier)); + if (identifier.equals(MANAGED_TRANSFORM_URN)) { +Schema configSchema = + SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); +Row configRow = + RowCoder.of(configSchema).decode(payload.getConfigurationRow().newInput()); +String underlyingIdentifier = +MoreObjects.firstNonNull( +configRow.getString("transform_identifier"), "unknown_identifier"); Review Comment: Let's add unit tests to make sure that the annotations get added correctly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add Iceberg workflows [beam]
github-actions[bot] commented on PR #31401: URL: https://github.com/apache/beam/pull/31401#issuecomment-2130393642 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add iceberg load test [beam]
ahmedabu98 closed pull request #31399: Add iceberg load test URL: https://github.com/apache/beam/pull/31399 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add Iceberg workflows [beam]
ahmedabu98 opened a new pull request, #31401: URL: https://github.com/apache/beam/pull/31401 Adding workflows for Iceberg integration and load tests. Integration test already exists. Load test will be added in #31392 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug]: ReadFromKafka does not work without max_num_records parameter [beam]
gomerudo opened a new issue, #31400: URL: https://github.com/apache/beam/issues/31400 ### What happened? When running a streaming job (with DirectRunner locally and with DataflowRunner on GCP) that uses the apache_beam.io.kafka.ReadFromKafka connector without `max_num_records`, the job does not process any information and instead gets trapped in an infinite loop of creating consumers that subscribe and get assigned a partition and offset but do not process any information. We are forcing `auto.offset.reset = earliest`. We verified that when setting `max_num_records` the job runs and process the information correctly both locally and on DataFlow. All of this makes us conclude that this is not a GCP issue but rather a Beam one. We noticed the infinite loop in the logs and we also noticed that Lenses never reports active members of the consumer group: ![image](https://github.com/apache/beam/assets/5495942/aeb6c311-0d53-410d-8c12-db0b24114a44) ![image](https://github.com/apache/beam/assets/5495942/46a0f513-f48c-448c-a264-7e100b463b65) We have tried the default Kafka configurations as well as custom ones. I'm just sharing the latest: ``` pipeline | "ReadFromStream" >> apache_beam.io.kafka.ReadFromKafka( consumer_config={ # Also tested with a single broker "bootstrap.servers": "kafka-1782273228-1-1908664276.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-4.prod.walmart.com:9092,kafka-1782274279-1-1908664354.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-5.prod.walmart.com:9092,kafka-1782274320-1-1908664432.wmt-uscentral.kafka-v2-qarth-reg-stg.ms-df-messaging.prod-wmt-uscentral-6.prod.walmart.com:9092", "auto.offset.reset": "earliest", "fetch.max.bytes": "52428800", "fetch.min.bytes": "1", "fetch.max.wait.ms": "1000", "max.poll.interval.ms": "2", "max.poll.records": "10", "request.timeout.ms": "3", "session.timeout.ms": "45000", "timeout.ms": "1", "group.id": "test-group-id", "heartbeat.interval.ms": "200", "reconnect.backoff.ms": "100", "reconnect.backoff.max.ms": "1", }, topics=["some-topic-i-cannot-share"], with_metadata=True, # max_num_records=1000 # For testing only `` This does not seem to be a problem of our Kafka Topic, since custom python clients (that use kafka-python) run successfully with the exact same Kafka configuration. Beam SDK language: Python Beam SDK version: 2.52.0 Any feedback is greatly appreciated. ### Issue Priority Priority: 2 (default / most bugs should be filed as P2) ### Issue Components - [X] Component: Python SDK - [ ] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add try-excepts around data sampler encoding [beam]
codecov[bot] commented on PR #31396: URL: https://github.com/apache/beam/pull/31396#issuecomment-2130339852 ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/31396?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report Attention: Patch coverage is `55.6%` with `4 lines` in your changes are missing coverage. Please review. > Project coverage is 77.62%. Comparing base [(`b34cf54`)](https://app.codecov.io/gh/apache/beam/commit/b34cf54b0a1511e88f76836d040c4ee67e420a71?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`7677083`)](https://app.codecov.io/gh/apache/beam/commit/76770833c99b7962fab52947cf4d4cbac44048e8?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 280 commits behind head on master. | [Files](https://app.codecov.io/gh/apache/beam/pull/31396?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [.../python/apache\_beam/runners/worker/data\_sampler.py](https://app.codecov.io/gh/apache/beam/pull/31396?src=pr&el=tree&filepath=sdks%2Fpython%2Fapache_beam%2Frunners%2Fworker%2Fdata_sampler.py&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9zYW1wbGVyLnB5) | 55.55% | [4 Missing :warning: ](https://app.codecov.io/gh/apache/beam/pull/31396?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #31396 +/- ## + Coverage 71.44% 77.62% +6.18% - Complexity 1474 2980+1506 Files 906 760 -146 Lines11327195406 -17865 Branches 1076 3229+2153 - Hits 8093174063-6868 + Misses3032719880 -10447 + Partials 2013 1463 -550 ``` | [Flag](https://app.codecov.io/gh/apache/beam/pull/31396/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [go](https://app.codecov.io/gh/apache/beam/pull/31396/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [python](https://app.codecov.io/gh/apache/beam/pull/31396/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `81.36% <55.55%> (-0.16%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/31396?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [ManagedIO] pass underlying transform URN as an annotation [beam]
codecov[bot] commented on PR #31398: URL: https://github.com/apache/beam/pull/31398#issuecomment-2130339854 ## [Codecov](https://app.codecov.io/gh/apache/beam/pull/31398?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 68.53%. Comparing base [(`1f63196`)](https://app.codecov.io/gh/apache/beam/commit/1f6319624a44ce113c39602cd4b6fff6c6db638a?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`dd20a70`)](https://app.codecov.io/gh/apache/beam/commit/dd20a70d505eac888379d1d1139101b9a2ae31b7?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 14 commits behind head on master. Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #31398 +/- ## - Coverage 68.55% 68.53% -0.02% Complexity1492114921 Files 2636 2637 +1 Lines222092 95 +203 Branches 1182611826 + Hits 152250 152358 +108 - Misses6364763742 +95 Partials 6195 6195 ``` | [Flag](https://app.codecov.io/gh/apache/beam/pull/31398/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [python](https://app.codecov.io/gh/apache/beam/pull/31398/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `81.37% <ø> (-0.09%)` | :arrow_down: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more. [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/beam/pull/31398?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add iceberg load test [beam]
ahmedabu98 opened a new pull request, #31399: URL: https://github.com/apache/beam/pull/31399 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1614002144 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java: ## @@ -97,18 +104,19 @@ public final class StreamingEngineClient { private StreamingEngineClient( JobHeader jobHeader, GetWorkBudget totalGetWorkBudget, - AtomicReference connections, GrpcWindmillStreamFactory streamFactory, - WorkItemProcessor workItemProcessor, + WorkItemScheduler workItemScheduler, ChannelCachingStubFactory channelCachingStubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, - long clientId) { + long clientId, + Function workCommitterFactory, + Consumer> heartbeatResponseProcessor) { this.jobHeader = jobHeader; this.started = new AtomicBoolean(); Review Comment: finish method (like stop in StreamingDataflowWorker) is only there for testing so i added annotation there as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1614001559 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java: ## @@ -97,18 +104,19 @@ public final class StreamingEngineClient { private StreamingEngineClient( JobHeader jobHeader, GetWorkBudget totalGetWorkBudget, - AtomicReference connections, GrpcWindmillStreamFactory streamFactory, - WorkItemProcessor workItemProcessor, + WorkItemScheduler workItemScheduler, ChannelCachingStubFactory channelCachingStubFactory, GetWorkBudgetDistributor getWorkBudgetDistributor, GrpcDispatcherClient dispatcherClient, - long clientId) { + long clientId, + Function workCommitterFactory, + Consumer> heartbeatResponseProcessor) { this.jobHeader = jobHeader; this.started = new AtomicBoolean(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613998893 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ExecuteWorkResult.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.processing; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; + +/** Value class that represents the result of executing user DoFns. */ +@AutoValue +abstract class ExecuteWorkResult { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613998310 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.processing; + +import java.time.Duration; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class StreamingCommitFinalizer { + private static final Logger LOG = LoggerFactory.getLogger(StreamingCommitFinalizer.class); + private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = Duration.ofMinutes(5L); + private final Cache onCommitFinalizedCache; + private final BoundedQueueExecutor workExecutor; + + private StreamingCommitFinalizer( + Cache onCommitFinalizedCache, BoundedQueueExecutor workExecutor) { +this.onCommitFinalizedCache = onCommitFinalizedCache; +this.workExecutor = workExecutor; + } + + static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) { +return new StreamingCommitFinalizer( + CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(), +workExecutor); + } + + /** + * Stores a map of user worker generated id's and callbacks to execute once a commit has been + * successfully committed to the backing state store. + */ + void cacheCommitFinalizers(Map commitCallbacks) { +onCommitFinalizedCache.putAll(commitCallbacks); + } + + /** + * Calls callbacks for WorkItem to mark that commit has been persisted (finalized) to the backing + * state store and to checkpoint the source. + */ + void finalizeCommits(Windmill.WorkItem work) { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix iceberg catalog validation [beam]
github-actions[bot] commented on PR #31349: URL: https://github.com/apache/beam/pull/31349#issuecomment-2130290123 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix iceberg catalog validation [beam]
ahmedabu98 commented on PR #31349: URL: https://github.com/apache/beam/pull/31349#issuecomment-2130288066 R: @VeronicaWasson -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Default SchemaTransform configs to snake_case [beam]
ahmedabu98 commented on PR #31374: URL: https://github.com/apache/beam/pull/31374#issuecomment-2130287268 Fixes #31353 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Default SchemaTransform configs to snake_case [beam]
github-actions[bot] commented on PR #31374: URL: https://github.com/apache/beam/pull/31374#issuecomment-2130283093 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Default SchemaTransform configs to snake_case [beam]
ahmedabu98 commented on PR #31374: URL: https://github.com/apache/beam/pull/31374#issuecomment-2130281990 R: @robertwb R: @chamikaramj CC: @lostluck (if this affects Go at all) CC: @Polber (if any more changes are needed for YAML) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613967488 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java: ## @@ -22,10 +22,19 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoValue -public abstract class ExecutionState { +@Internal +public abstract class ExecutionState implements AutoCloseable { Review Comment: `ExecutionState` is also specific to a computation so will add comment and rename accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613964582 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java: ## @@ -22,10 +22,19 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoValue -public abstract class ExecutionState { +@Internal +public abstract class ExecutionState implements AutoCloseable { Review Comment: I think it will be worth exploring cleaning up StreamingModeExecutionContext + ExecutionState looks like `streaming.worker.ExecutionState` (there is another execution state StreamingModeExecutionState which extends `DataflowExecutionState` which extends `core.metrics.ExecutionState` which seems more for metric sampling) There could be a lot of clean up here Added a comment and TODO to clean up as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613958837 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java: ## @@ -151,37 +154,22 @@ public boolean workIsFailed() { public void start( @Nullable Object key, - Windmill.WorkItem work, - Instant inputDataWatermark, - @Nullable Instant outputDataWatermark, - @Nullable Instant synchronizedProcessingTime, + Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, - Windmill.WorkItemCommitRequest.Builder outputBuilder, - @Nullable Supplier workFailed) { + Windmill.WorkItemCommitRequest.Builder outputBuilder) { this.key = key; -this.work = work; -this.workIsFailed = (workFailed != null) ? workFailed : () -> Boolean.FALSE; +this.work = work.getWorkItem(); +this.workIsFailed = work::isFailed; this.computationKey = -WindmillComputationKey.create(computationId, work.getKey(), work.getShardingKey()); +WindmillComputationKey.create( +computationId, work.getWorkItem().getKey(), work.getWorkItem().getShardingKey()); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613957781 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java: ## @@ -87,6 +89,7 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) +@NotThreadSafe Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]
akashk99 commented on issue #31313: URL: https://github.com/apache/beam/issues/31313#issuecomment-2130258010 Im surprised this is a bug considering restoring from a flink savepoint is a pretty common use case, is it possible there some configuration missing somewhere? I havent been able to find anyone else online experiencing this same issue but I was able to replicate it using both kinesis and kafka. Given how common of a use case it is, Im not 100% sure I believe this is in fact a bug and most likely some user error on my part. I can make do without savepoints by utilizing kafka offset commits and consumer groups to ensure no data is lost, but cant figure out a way to not lose data that is windowed but not triggered when the flink application is stopped. Maybe you know of a solution to that problem? it seems like a lot of the subtasks arent being utilized when stripping IDs with beam_fn_api despite the number of shards being 20 and parallelism being 24 (in theory should only be 4 idle subtasks) https://github.com/apache/beam/assets/38279340/fda0120c-c1a2-4d68-929a-d4ab47f1c57c";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [#29697] Add prism artifact building workflow. [beam]
github-actions[bot] commented on PR #31369: URL: https://github.com/apache/beam/pull/31369#issuecomment-2130253610 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [#29697] Add prism artifact building workflow. [beam]
lostluck commented on PR #31369: URL: https://github.com/apache/beam/pull/31369#issuecomment-2130252309 R: @kennknowles cc: @damccorm @jrmccluskey @abacn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Enable to handle NotFound and AccessDenied errors in the streaming insert of `BigQueryIO` [beam]
damccorm commented on PR #31310: URL: https://github.com/apache/beam/pull/31310#issuecomment-2130202164 > But, Dataflow doesn't recommend using a dead-letter topic. That is strictly for pubsub reads. There is nothing wrong with using a dead letter queue for failed records (this is actually an encouraged pattern, see "When performing writes from Dataflow to a connector, consider using an [ErrorHandler](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.html) to handle any failed writes..." https://cloud.google.com/dataflow/docs/guides/io-connector-best-practices) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613894213 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java: ## @@ -193,31 +180,33 @@ public void start( for (StepContext stepContext : stepContexts) { stepContext.start( stateReader, -inputDataWatermark, +work.watermarks().inputDataWatermark(), Review Comment: done added TODO, will probably update all usage where `Instant inputDataWatermark, Instant outputDatamark, Instant synchronizedProcessingTime` are used together with `Work.Watermarks`. Since this is the case will move `Watermarks` outside of Work.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #31390 emit watermark with empty source [beam]
Abacn commented on PR #31391: URL: https://github.com/apache/beam/pull/31391#issuecomment-2130169518 Thanks, taking a look At the same time, have a couple of questions (not directly related to the change) - This sounds similar to #30969, what is the difference here ? - I also observed similar issue on JmsIO on Dataflow runner ("watermark does not increase when there is no incoming data for a while") and the fix #30337 didn't work. I am wondering if #31390 is generic at SDK level and a fix could posed in general ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
m-trieu commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613890068 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java: ## @@ -152,37 +154,22 @@ public boolean workIsFailed() { public void start( @Nullable Object key, - Windmill.WorkItem work, - Instant inputDataWatermark, - @Nullable Instant outputDataWatermark, - @Nullable Instant synchronizedProcessingTime, + Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, - Windmill.WorkItemCommitRequest.Builder outputBuilder, - @Nullable Supplier workFailed) { + Windmill.WorkItemCommitRequest.Builder outputBuilder) { this.key = key; -this.work = work; -this.workIsFailed = (workFailed != null) ? workFailed : () -> Boolean.FALSE; +this.work = work.getWorkItem(); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add support for ConnectionFactory ProviderFn in JmsIO [beam]
Abacn commented on PR #31264: URL: https://github.com/apache/beam/pull/31264#issuecomment-2130151490 waiting on author -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add try-excepts around data sampler encoding [beam]
github-actions[bot] commented on PR #31396: URL: https://github.com/apache/beam/pull/31396#issuecomment-2130145185 Assigning reviewers. If you would like to opt out of this review, comment `assign to next reviewer`: R: @riteshghorse for label python. Available commands: - `stop reviewer notifications` - opt out of the automated review tooling - `remind me after tests pass` - tag the comment author after tests pass - `waiting on author` - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers) The PR bot will only process comments in the main thread (not review comments). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update beam-master version to 20240524. [beam]
damccorm merged PR #31393: URL: https://github.com/apache/beam/pull/31393 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[I] [Bug]: error when trying to serialize headers with LongString [beam]
alexxfreitag opened a new issue, #31397: URL: https://github.com/apache/beam/issues/31397 ### What happened? Apache Beam version: 2.55.0 Java version: 17 When attempting to read messages with List type headers, a NotSerializableException error is being returned. Example: ![image](https://github.com/apache/beam/assets/32180411/f7acc148-fd20-4e3b-b630-57889a3ce164) If the headers are String, Number, or Boolean, it works normally. I have tried using a CustomCoder with `.setCoder(new CustomCoder())` to try to avoid the headers that are not important in this case, but it appears to throw the error before reaching this coder. Stacktrace: ``` Error message from worker: java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@f9a0d014}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(SerializableCoder(org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage))'. org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300) org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:642) org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:558) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:384) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:128) org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:67) org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218) org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1263) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:999) org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81) org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeLockHeld$0(BoundedQueueExecutor.java:183) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) java.base/java.lang.Thread.run(Thread.java:833) Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187) java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) java.base/java.util.ArrayList.writeObject(ArrayList.java:866) java.base/jdk.internal.reflect.GeneratedMethodAccessor62.invoke(Unknown Source) java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.base/java.lang.reflect.Method.invoke(Method.java:568) java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070) java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516) java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438) java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181) java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) java.base/java.util.HashMap.internalWriteEntries(HashMap.java:1944) java.base/java.util.HashMap.writeObject(HashMap.java:1497) java.base/jdk.internal.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.base/java.lang.reflect.Method.invoke(Method.java:568) java.base/java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1070) java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1516) java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1438) java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1181)
[PR] Add try-excepts around data sampler encoding [beam]
rohdesamuel opened a new pull request, #31396: URL: https://github.com/apache/beam/pull/31396 Add try-excepts around data sampler encoding Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add option to disable validation of cloud bigtable change stream IO [beam]
svetakvsundhar commented on PR #31376: URL: https://github.com/apache/beam/pull/31376#issuecomment-2130065700 Sounds good. Will merge after CI completes! Thanks for the quick turnaround! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add option to disable validation of cloud bigtable change stream IO [beam]
tonytanger commented on PR #31376: URL: https://github.com/apache/beam/pull/31376#issuecomment-2130063711 @svetakvsundhar we're ready to merge. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update beam-master version to 20240524. [beam]
shunping commented on PR #31393: URL: https://github.com/apache/beam/pull/31393#issuecomment-2130046766 Run Python_ML PreCommit 3.12 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add option to disable validation of cloud bigtable change stream IO [beam]
jackdingilian commented on code in PR #31376: URL: https://github.com/apache/beam/pull/31376#discussion_r1613798994 ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java: ## @@ -2284,25 +2286,77 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) { return toBuilder().setBacklogReplicationAdjustment(adjustment).build(); } +/** Review Comment: We should note that this disables metadata table creation / update and that will need to be done explicitly when this is enabled (and the table isn't created / up-to-date) ## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java: ## @@ -2284,25 +2286,77 @@ public ReadChangeStream withBacklogReplicationAdjustment(Duration adjustment) { return toBuilder().setBacklogReplicationAdjustment(adjustment).build(); } +/** + * Disables validation that the table being read and the metadata table exists, and that the app + * profile used is single cluster and single row transcation enabled. Set this option if the + * caller does not have additional Bigtable permissions to validate the configurations. + */ +public ReadChangeStream withoutValidation() { + BigtableConfig config = getBigtableConfig(); + BigtableConfig metadataTableConfig = getMetadataTableBigtableConfig(); + return toBuilder() + .setBigtableConfig(config.withValidate(false)) + .setMetadataTableBigtableConfig(metadataTableConfig.withValidate(false)) + .setValidateConfig(false) + .build(); +} + +@Override +public void validate(PipelineOptions options) { + BigtableServiceFactory factory = new BigtableServiceFactory(); + if (getBigtableConfig().getValidate()) { +try { + checkArgument( + factory.checkTableExists(getBigtableConfig(), options, getTableId()), + "Change Stream table %s does not exist", + getTableId()); +} catch (IOException e) { + throw new RuntimeException(e); +} + } +} + +// Validate the app profile is single cluster and allows single row transactions. +private void validateAppProfile( +MetadataTableAdminDao metadataTableAdminDao, String appProfileId) { + checkArgument(metadataTableAdminDao != null); + checkArgument( + metadataTableAdminDao.isAppProfileSingleClusterAndTransactional(appProfileId), + "App profile id '" + + appProfileId + + "' provided to access metadata table needs to use single-cluster routing policy" + + " and allow single-row transactions."); +} + +// Update metadata table schema if allowed and required. +private void createOrUpdateMetadataTable( +MetadataTableAdminDao metadataTableAdminDao, String metadataTableId) { + boolean shouldCreateOrUpdateMetadataTable = true; + if (getCreateOrUpdateMetadataTable() != null) { +shouldCreateOrUpdateMetadataTable = getCreateOrUpdateMetadataTable(); + } + // Only try to create or update metadata table if option is set to true. Otherwise, just + // check if the table exists. + if (shouldCreateOrUpdateMetadataTable && metadataTableAdminDao.createMetadataTable()) { +LOG.info("Created metadata table: " + metadataTableId); + } +} + @Override public PCollection> expand(PBegin input) { checkArgument( getBigtableConfig() != null, "BigtableIO ReadChangeStream is missing required configurations fields."); - checkArgument( - getBigtableConfig().getProjectId() != null, "Missing required projectId field."); - checkArgument( - getBigtableConfig().getInstanceId() != null, "Missing required instanceId field."); + getBigtableConfig().validate(); checkArgument(getTableId() != null, "Missing required tableId field."); BigtableConfig bigtableConfig = getBigtableConfig(); Review Comment: Nit: move this before the two calls to getBigtableConfig within this method above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Fix an incompatibility with hamcrest 2.2 [beam]
github-actions[bot] commented on PR #31395: URL: https://github.com/apache/beam/pull/31395#issuecomment-2130008700 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update beam-master version to 20240524. [beam]
shunping commented on PR #31393: URL: https://github.com/apache/beam/pull/31393#issuecomment-2129992467 Run Python_Integration PreCommit 3.12 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Fix an incompatibility with hamcrest 2.2 [beam]
cushon opened a new pull request, #31395: URL: https://github.com/apache/beam/pull/31395 This test asserts on the message of an AssertionError thrown by a failing hamcrest assertion. The message changes in the latest versions of hamcrest. ``` 1) testBadCoderIsNotDeterministic(org.apache.beam.sdk.testing.CoderPropertiesTest) java.lang.AssertionError: Expected: a string containing "<84>, <101>, <115>, <116>, <68>" but: was " Expected: [<24b>, <84b>, <101b>, <115b>, <116b>, <68b>, <97b>, <116b>, <97b>, <51b>, <51b>, <50b>, <54b>, <49b>, <57b>, <57b>, <52b>, <57b>, <48b>, <57b>, <57b>, <55b>, <48b>, <53b>, <53b>] but: was [<24b>, <84b>, <101b>, <115b>, <116b>, <68b>, <97b>, <116b>, <97b>, <51b>, <51b>, <50b>, <54b>, <49b>, <57b>, <57b>, <52b>, <56b>, <50b>, <48b>, <52b>, <49b>, <54b>, <49b>, <53b>]" at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.beam.sdk.testing.CoderPropertiesTest.testBadCoderIsNotDeterministic(CoderPropertiesTest.java:123) ``` Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] added pytest.mark.flaky for test_big_query_write_temp_table_append_sc… [beam]
tvalentyn commented on PR #31364: URL: https://github.com/apache/beam/pull/31364#issuecomment-2129883889 PTAL at PreCommit Python Lint / beam_PreCommit_PythonLint (Run PythonLint PreCommit) (pull_request_target) Failing after 16m -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Don't install TF on Python 3.12, since there is no compatible version atm. [beam]
tvalentyn merged PR #31386: URL: https://github.com/apache/beam/pull/31386 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Failing Test]: Python 3.12 ML Precommit suite is failing [beam]
tvalentyn closed issue #31385: [Failing Test]: Python 3.12 ML Precommit suite is failing URL: https://github.com/apache/beam/issues/31385 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Don't install TF on Python 3.12, since there is no compatible version atm. [beam]
tvalentyn commented on PR #31386: URL: https://github.com/apache/beam/pull/31386#issuecomment-2129872124 Py3.12 suite passed: https://github.com/apache/beam/actions/runs/9216485596/job/25356862179?pr=31386, other suites are not affected by this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #31390 emit watermark with empty source [beam]
je-ik commented on PR #31391: URL: https://github.com/apache/beam/pull/31391#issuecomment-2129870111 > The PVR test seems to be stuck at `ViewTest.testTriggeredLatestSingleton`. I can observe this locally on both `master` and `release-2.56.0` branches. Does this check completed successfully recently? Hm, it passed on swcond run. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #31390 emit watermark with empty source [beam]
github-actions[bot] commented on PR #31391: URL: https://github.com/apache/beam/pull/31391#issuecomment-2129810043 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #31390 emit watermark with empty source [beam]
je-ik commented on PR #31391: URL: https://github.com/apache/beam/pull/31391#issuecomment-2129807437 R: @Abacn The PVR test seems to be stuck at `ViewTest.testTriggeredLatestSingleton`. I can observe this locally on both `master` and `release-2.56.0` branches. Does this check completed successfully recently? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] 31112 drop flink 1.14 [beam]
github-actions[bot] commented on PR #31394: URL: https://github.com/apache/beam/pull/31394#issuecomment-2129699085 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] 31112 drop flink 1.14 [beam]
je-ik commented on PR #31394: URL: https://github.com/apache/beam/pull/31394#issuecomment-2129696424 R: @Abacn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Add iceberg load test [beam]
github-actions[bot] commented on PR #31392: URL: https://github.com/apache/beam/pull/31392#issuecomment-2129679833 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] 31112 drop flink 1.14 [beam]
je-ik opened a new pull request, #31394: URL: https://github.com/apache/beam/pull/31394 Closes #31112 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update beam-master version to 20240524. [beam]
github-actions[bot] commented on PR #31393: URL: https://github.com/apache/beam/pull/31393#issuecomment-2129657276 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update beam-master version to 20240524. [beam]
shunping commented on PR #31393: URL: https://github.com/apache/beam/pull/31393#issuecomment-2129654893 R: @damccorm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update beam-master version to 20240524. [beam]
shunping opened a new pull request, #31393: URL: https://github.com/apache/beam/pull/31393 There is a recent update on the version dependency of `google-cloud-storage`, so we need a new beam-master image. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [flink] #31390 emit watermark with empty source [beam]
github-actions[bot] commented on PR #31391: URL: https://github.com/apache/beam/pull/31391#issuecomment-2129622422 Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment `assign set of reviewers` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka SchemaTransform translation [beam]
github-actions[bot] commented on PR #31362: URL: https://github.com/apache/beam/pull/31362#issuecomment-2129593876 Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka SchemaTransform translation [beam]
ahmedabu98 commented on PR #31362: URL: https://github.com/apache/beam/pull/31362#issuecomment-2129591262 R: @chamikaramj R: @damccorm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Support MapState in DataflowRunner [beam]
scwhittle commented on issue #18200: URL: https://github.com/apache/beam/issues/18200#issuecomment-2129583396 We could do so by implementing on top of MultimapState similar to fnapi harness https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L454 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] Support SetState in Dataflow runner [beam]
scwhittle commented on issue #18140: URL: https://github.com/apache/beam/issues/18140#issuecomment-2129581936 We could do so by implementing on top of existing MultiMapState similiar to how it is done for the fnapi harness https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/FnApiStateAccessor.java#L370 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add iceberg load test [beam]
ahmedabu98 opened a new pull request, #31392: URL: https://github.com/apache/beam/pull/31392 Adding a load test for IcebergIO Separating integration test added in #31220 into its own suite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] add support for ConnectionFactory ProviderFn in JmsIO [beam]
github-actions[bot] commented on PR #31264: URL: https://github.com/apache/beam/pull/31264#issuecomment-2129388979 Reminder, please take a look at this pr: @damondouglas @shunping -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: Watermarks and Windowing Not Working with FlinkRunner and KinesisIO Read Transform [beam]
yelianevich commented on issue #31085: URL: https://github.com/apache/beam/issues/31085#issuecomment-2129353403 @je-ik sorry for dummy question, is the only way to get binaries is by building it locally and then run the test? Is there a faster way to get binaries? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] remove processing/scheduling logic from StreamingDataflowWorker [beam]
scwhittle commented on code in PR #31317: URL: https://github.com/apache/beam/pull/31317#discussion_r1613137200 ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java: ## @@ -87,6 +89,7 @@ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) +@NotThreadSafe Review Comment: internal annotation Can you improve class comment as well to help clarify difference between Work and this? This is reused across work instances ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java: ## @@ -152,37 +154,22 @@ public boolean workIsFailed() { public void start( @Nullable Object key, - Windmill.WorkItem work, - Instant inputDataWatermark, - @Nullable Instant outputDataWatermark, - @Nullable Instant synchronizedProcessingTime, + Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, - Windmill.WorkItemCommitRequest.Builder outputBuilder, - @Nullable Supplier workFailed) { + Windmill.WorkItemCommitRequest.Builder outputBuilder) { this.key = key; -this.work = work; -this.workIsFailed = (workFailed != null) ? workFailed : () -> Boolean.FALSE; +this.work = work.getWorkItem(); Review Comment: should we just store the work? and get rid of workIsFailed? ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java: ## @@ -35,8 +42,13 @@ public abstract class ExecutionState { public abstract ExecutionStateTracker executionStateTracker(); - public static ExecutionState.Builder builder() { -return new AutoValue_ExecutionState.Builder(); + public final void close() { Review Comment: If it's selectively closed probably better without autosclosable. I think I got a lint warning about creating something autosclosable not in a try block. keep the comment though ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutionState.java: ## @@ -22,10 +22,19 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoValue -public abstract class ExecutionState { +@Internal +public abstract class ExecutionState implements AutoCloseable { Review Comment: Can you add a comment here? We have a bunch of context-y things for processing floating around. Would be good to note that this is per-computation and that it can be reused across bundles/work with resetting of things it contains. Separately I wonder if we should merge this and StreamingModeExecutionStateContext? ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java: ## @@ -193,31 +180,33 @@ public void start( for (StepContext stepContext : stepContexts) { stepContext.start( stateReader, -inputDataWatermark, +work.watermarks().inputDataWatermark(), Review Comment: could pass in watermarks instead of separate params (coudl be followup) ## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ExecuteWorkResult.java: ## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.work.processing; + +import com.google.auto.value.AutoValue; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; + +/** Value class that represents the result of executing user DoFns. */ +@AutoValue +abstract class ExecuteWorkResult { Review Comment: seems like this
Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]
je-ik commented on issue #31313: URL: https://github.com/apache/beam/issues/31313#issuecomment-2129123482 I don't know the root cause, it seems that Flink does not send the snapshot state after restore from savepoint. I observed this on the Impulse (I suspected that it affects only bounded sources running in unbounded mode, but it seems it is not the case). It might be a Beam bug or a Flink bug. > Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation? The flag turns on different expansion for Read transform - it uses splittable DoFn (SDF), which uses Impulse which was fixed earlier. Performance should be similar to classical Read. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [flink] #31390 emit watermark with empty source [beam]
je-ik opened a new pull request, #31391: URL: https://github.com/apache/beam/pull/31391 Closes #31390 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #` instead. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier). To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md) GitHub Actions Tests Status (on master branch) [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule) [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule) [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule) [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule) See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI or the [workflows README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) to see a list of phrases to trigger workflows. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [yaml]: Normalize BigtableIO [beam]
ffernandez92 commented on issue #28672: URL: https://github.com/apache/beam/issues/28672#issuecomment-2128911363 Hey @Polber , are you actively working on this issue at the moment? I may have some time in the next couple of weeks, so I could try to work on it unless you have already started. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]
akashk99 commented on issue #31313: URL: https://github.com/apache/beam/issues/31313#issuecomment-2128866628 I am noticing actually a lot of back pressure using this approach despite downstream operators having low CPU usage. Is the fix to the root cause relatively straight forward in which case I can implement it in a forked version of the repo? or is it more involved? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [I] [Bug]: KinesisIO source on FlinkRunner initializes the same splits twice [beam]
akashk99 commented on issue #31313: URL: https://github.com/apache/beam/issues/31313#issuecomment-2128818033 @je-ik Hi, yes that did fix the issue. thank you! For my understanding, what does this option do exactly? And should I expect any performance degradation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org