Re: How windowing is implemented on Flink runner
Hi, Wanted to follow up as I did have similar case. So this means it is ok for Beam to use Sliding window of 1 day with 1 sec period (with using different trigger than after watermark to avoid outputting data from every window) and there is no additional performance penalty (duplicating input messages for storage or cpu for resolving windows)? Interesting from both Flink and Dataflow perspective (both Python and Java). I ended up implementing the logic with Beam state and timers (which is quite performant and readable) but also interested in other possibilities. Best Wiśniowski Piort On 12.06.2024 21:50, Ruben Vargas wrote: I imagined it but wasn't sure! Thanks for the clarification! On Wed, Jun 12, 2024 at 1:42 PM Robert Bradshaw via user wrote: Beam implements Windowing itself (via state and timers) rather than deferring to Flink's implementation. On Wed, Jun 12, 2024 at 11:55 AM Ruben Vargas wrote: Hello guys May be a silly question, But in the Flink runner, the window implementation uses the Flink windowing? Does that mean the runner will have performance issues like Flink itself? see this: https://issues.apache.org/jira/browse/FLINK-7001 I'm asking because I see the issue, it mentions different concepts that Beam already handles at the API level. So my suspicion is that the Beam model handles windowing a little differently from the pure Flink app. But I'm not sure.. Regards.
Re: Watermark progress halt in Python streaming pipelines
Hi, Getting back with update. After updating `grpcio` the issues are gone. Thank you for the solution and investigation. Feels like I own you a beer :) Best Wiśniowski Piotr On 24.04.2024 22:11, Valentyn Tymofieiev wrote: On Wed, Apr 24, 2024 at 12:40 PM Wiśniowski Piotr wrote: Hi! Thank you for the hint. We will try with the mitigation from the issue. We did already tried everything from https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact , but lets hope upgrading the dependency will help. Will keep reply to this thread once I get confirmation. BTW great job on the investigation of bug that you mentioned. Impressive. Seems like a nasty one. Thanks. I was specifically recommending you check the recently added content under "It might be possible to retrieve stacktraces of a thread that is holding the GIL on a running Dataflow worker as follows:", as that should help find out what is causing stuckness in your case. But hopefully it won't be necessary after you adjust the grpcio version. Best, Wiśniowski Piotr On 24.04.2024 00:31, Valentyn Tymofieiev via user wrote: You might be running into https://github.com/apache/beam/issues/30867. Among the error messages you mentioned, the following is closer to rootcause: ``Error message from worker: generic::internal: Error encountered with the status channel: There are 10 consecutive failures obtaining SDK worker status info from sdk-0-0. The last success response was received 3h20m2.648304212s ago at 2024-04-23T11:48:35.493682768+00:00. SDK worker appears to be permanently unresponsive. Aborting the SDK. For more information, see: https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact``` <https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact> If mitigations in https://github.com/apache/beam/issues/30867 don't resolve your issue, please see https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact for insturctions on how to find what causes the workers to be stuck. Thanks! On Tue, Apr 23, 2024 at 12:17 PM Wiśniowski Piotr wrote: Hi, We are investigating an issue with our Python SDK streaming pipelines, and have few questions, but first context. Our stack: - Python SDK 2.54.0 but we tried also 2.55.1 - DataFlow Streaming engine with sdk in container image (we tried also Prime) - Currently our pipelines do have low enough traffic, so that single node handles it most of the time, but occasionally we do scale up. - Deployment by Terraform `google_dataflow_flex_template_job` resource, which normally does job update when re-applying Terraform. - We do use a lot `ReadModifyWriteStateSpec`, other states and watermark timers, but we do keep a the size of state under control. - We do use custom coders as Pydantic avro. The issue: - Occasionally watermark progression stops. The issue is not deterministic, and happens like 1-2 per day for few pipelines. - No user code errors reported- but we do get errors like this: ```INTERNAL: The work item requesting state read is no longer valid on the backend. The work has already completed or will be retried. This is expected during autoscaling events. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] <http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D> { trail_point { source_file_loc { filepath: "dist_proc/windmill/client/streaming_rpc_client.cc" line: 767 } } }']``` ```ABORTED: SDK harness sdk-0-0 disconnected. This usually means that the process running the pipeline code has crashed. Inspect the Worker Logs and the Diagnostics tab to determine the cause of the crash. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] <http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D> { trail_point { source_file_loc { filepath: "dist_proc/dax/workflow/worker/fnapi_control_service.cc" line: 217 } } } [dist_proc.dax.MessageCode] { origin_id: 5391582787251181999 [dist_proc.dax.workflow.workflow_io_message_ext]: SDK_DISCONNECT }']``` ```Work item for sharding key 8dd4578b4f280f5d tokens (1316764909133315359, 17766288489530478880) encountered error during processing, will be retried (possibly on another worker): generic::internal: Error encountered with the status channel: SDK harness sdk-0-0 disconnected. with MessageCode: (93f1db2f7a4a325
Re: Hot update in dataflow without lossing messages
HI, I have pretty same setup. Regarding Terraform and DataFlow on GCP: - Terraform apply does check if there is a DataFlow job running with same `job_name` - if there is not - it does create a new one and waits till its in "running" state - if there is one already - it does try to update the job, which means create a new job with same "job_name" which will be running the new version of the code, and send "update" signal to the old one. After that, old job halts and waits for the new one to fully start and transmit the state of the old job. Once that's done the old job goes into "updated" state, and new one does process messages. If the new one fails the old one resumes processing. - Note for this to work the new code requires to be compatible with the old one. If its not, the new job will fail, and old job will get slightly behind as it needed to wait for the new job to fail. - Note 2: there is a way to run verify compatibility so that the new job will not start, but there will be a check to make sure it is compatible with the new job, hence avoiding possible delays in the old job. - Note 3: there is entirely separate job update type called "in-flight update" which does not effectively change the job, but allows to change autoscaller parameters (like max number of workers) without creating any delays in the pipeline. Given above context, to fully diagnose your issue, more information is needed, but you might be hitting the issue mentioned by Robert: - if you use a topic for PubSubIO, this will mean that each new job does create a new subscription on the topic on graph construction time. So this means if there are messages in the old subscription that were not yet processed (and acked) by the old pipeline, and the old pipeline gets "update" signal and halts, there might be some time duration when messages can be published to the old subscription and not published to the new one. Workarounds: - use subscription on PubSubIO or - use random job names on TF and drain old pipelines. Note all above is just hypothesis, but hopefully it might be helpful. Best Wiśniowski Piotr On 16.04.2024 05:15, Juan Romero wrote: The deployment in the job is made by terraform. I verified and seems that terraform do it incorrectly under the hood because it stop the current job and starts and new one. Thanks for the information ! On Mon, 15 Apr 2024 at 6:42 PM Robert Bradshaw via user wrote: Are you draining[1] your pipeline or simply canceling it and starting a new one? Draining should close open windows and attempt to flush all in-flight data before shutting down. For PubSub you may also need to read from subscriptions rather than topics to ensure messages are processed by either one or the other. [1] https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain On Mon, Apr 15, 2024 at 9:33 AM Juan Romero wrote: Hi guys. Good morning. I haven't done some test in apache beam over data flow in order to see if i can do an hot update or hot swap meanwhile the pipeline is processing a bunch of messages that fall in a time window of 10 minutes. What I saw is that when I do a hot update over the pipeline and currently there are some messages in the time window (before sending them to the target), the current job is shutdown and dataflow creates a new one. The problem is that it seems that I am losing the messages that were being processed in the old one and they are not taken by the new one, which imply we are incurring in losing data . Can you help me or recommend any strategy to me? Thanks!!
Re: Any recomendation for key for GroupIntoBatches
Hi, Might be late to the discussion, but providing another option (as I think it was not mentioned or I missed it). Take a look at [this](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) as I think this is precisely what you want to achieve. Compared to other answers: - this one is elastic, to fit any downstream use case - no custom code - native Beam transform - no shuffling of the data required as the data would be batched on the worker already having the data (but pay attention to the max msg size limit of your runner) - shuffling would be required when creating artificial random-looking keys. Note that above is Python, but I do bet there is Java counterpart (or at least easy to implement). Best Wiśniowski Piotr On 15.04.2024 19:14, Reuven Lax via user wrote: There are various strategies. Here is an example of how Beam does it (taken from Reshuffle.viaRandomKey().withNumBuckets(N) Note that this does some extra hashing to work around issues with the Spark runner. If you don't care about that, you could implement something simpler (e.g. initialize shard to a random number in StartBundle, and increment it mod numBuckets in each processelement call). public static class AssignShardFn extends DoFn> { private int shard; private @Nullable Integer numBuckets; public AssignShardFn(@Nullable Integer numBuckets) { this.numBuckets = numBuckets; } @Setup public void setup() { shard =ThreadLocalRandom.current().nextInt(); } @ProcessElement public void processElement(@Element T element,OutputReceiver> r) { ++shard; // Smear the shard into something more random-looking, to avoid issues // with runners that don't properly hash the key being shuffled, but rely // on it being random-looking. E.g. Spark takes the Java hashCode() of keys, // which for Integer is a no-op and it is an issue: // http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in- // spark.html // This hashing strategy is copied from // org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear(). int hashOfShard =0x1b873593 *Integer.rotateLeft(shard *0xcc9e2d51,15); if (numBuckets !=null) { UnsignedInteger unsignedNumBuckets =UnsignedInteger.fromIntBits(numBuckets); hashOfShard =UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue(); } r.output(KV.of(hashOfShard, element)); } } On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas wrote: Good day, Ruben, Would you be able to compute a shasum on the group of IDs to use as the key? Best, Damon On 2024/04/12 19:22:45 Ruben Vargas wrote: > Hello guys > > Maybe this question was already answered, but I cannot find it and > want some more input on this topic. > > I have some messages that don't have any particular key candidate, > except the ID, but I don't want to use it because the idea is to > group multiple IDs in the same batch. > > This is my use case: > > I have an endpoint where I'm gonna send the message ID, this endpoint > is gonna return me certain information which I will use to enrich my > message. In order to avoid fetching the endpoint per message I want to > batch it in 100 and send the 100 IDs in one request ( the endpoint > supports it) . I was thinking on using GroupIntoBatches. > > - If I choose the ID as the key, my understanding is that it won't > work in the way I want (because it will form batches of the same ID). > - Use a constant will be a problem for parallelism, is that correct? > > Then my question is, what should I use as a key? Maybe something > regarding the timestamp? so I can have groups of messages that arrive > at a certain second? > > Any suggestions would be appreciated > > Thanks. >
Re: Watermark progress halt in Python streaming pipelines
Hi! Thank you for the hint. We will try with the mitigation from the issue. We did already tried everything from https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact , but lets hope upgrading the dependency will help. Will keep reply to this thread once I get confirmation. BTW great job on the investigation of bug that you mentioned. Impressive. Seems like a nasty one. Best, Wiśniowski Piotr On 24.04.2024 00:31, Valentyn Tymofieiev via user wrote: You might be running into https://github.com/apache/beam/issues/30867. Among the error messages you mentioned, the following is closer to rootcause: ``Error message from worker: generic::internal: Error encountered with the status channel: There are 10 consecutive failures obtaining SDK worker status info from sdk-0-0. The last success response was received 3h20m2.648304212s ago at 2024-04-23T11:48:35.493682768+00:00. SDK worker appears to be permanently unresponsive. Aborting the SDK. For more information, see: https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact``` <https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```> If mitigations in https://github.com/apache/beam/issues/30867 don't resolve your issue, please see https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact for insturctions on how to find what causes the workers to be stuck. Thanks! On Tue, Apr 23, 2024 at 12:17 PM Wiśniowski Piotr wrote: Hi, We are investigating an issue with our Python SDK streaming pipelines, and have few questions, but first context. Our stack: - Python SDK 2.54.0 but we tried also 2.55.1 - DataFlow Streaming engine with sdk in container image (we tried also Prime) - Currently our pipelines do have low enough traffic, so that single node handles it most of the time, but occasionally we do scale up. - Deployment by Terraform `google_dataflow_flex_template_job` resource, which normally does job update when re-applying Terraform. - We do use a lot `ReadModifyWriteStateSpec`, other states and watermark timers, but we do keep a the size of state under control. - We do use custom coders as Pydantic avro. The issue: - Occasionally watermark progression stops. The issue is not deterministic, and happens like 1-2 per day for few pipelines. - No user code errors reported- but we do get errors like this: ```INTERNAL: The work item requesting state read is no longer valid on the backend. The work has already completed or will be retried. This is expected during autoscaling events. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] <http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D> { trail_point { source_file_loc { filepath: "dist_proc/windmill/client/streaming_rpc_client.cc" line: 767 } } }']``` ```ABORTED: SDK harness sdk-0-0 disconnected. This usually means that the process running the pipeline code has crashed. Inspect the Worker Logs and the Diagnostics tab to determine the cause of the crash. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] <http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D> { trail_point { source_file_loc { filepath: "dist_proc/dax/workflow/worker/fnapi_control_service.cc" line: 217 } } } [dist_proc.dax.MessageCode] { origin_id: 5391582787251181999 [dist_proc.dax.workflow.workflow_io_message_ext]: SDK_DISCONNECT }']``` ```Work item for sharding key 8dd4578b4f280f5d tokens (1316764909133315359, 17766288489530478880) encountered error during processing, will be retried (possibly on another worker): generic::internal: Error encountered with the status channel: SDK harness sdk-0-0 disconnected. with MessageCode: (93f1db2f7a4a325c): SDK disconnect.``` ```Python (worker sdk-0-0_sibling_1) exited 1 times: signal: segmentation fault (core dumped) restarting SDK process``` - We did manage to correlate this with either vertical autoscaling event (when using Prime) or other worker replacements done by Dataflow under the hood, but this is not deterministic. - For few hours watermark progress does stop, but other workers do process messages. - and after few hours: ```Error message from worker: generic::internal: Error encountered with the status channel: There are 10 consecutive failures obtaining SDK worker status info from sdk-0-0. The last success response was received 3h20m2.648304212s ago at 2024-04-23T11:48:35.493682768+00:00. SDK worker appears to be permanently unresponsive. Aborting the SDK. For more information, see: https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```
Watermark progress halt in Python streaming pipelines
Hi, We are investigating an issue with our Python SDK streaming pipelines, and have few questions, but first context. Our stack: - Python SDK 2.54.0 but we tried also 2.55.1 - DataFlow Streaming engine with sdk in container image (we tried also Prime) - Currently our pipelines do have low enough traffic, so that single node handles it most of the time, but occasionally we do scale up. - Deployment by Terraform `google_dataflow_flex_template_job` resource, which normally does job update when re-applying Terraform. - We do use a lot `ReadModifyWriteStateSpec`, other states and watermark timers, but we do keep a the size of state under control. - We do use custom coders as Pydantic avro. The issue: - Occasionally watermark progression stops. The issue is not deterministic, and happens like 1-2 per day for few pipelines. - No user code errors reported- but we do get errors like this: ```INTERNAL: The work item requesting state read is no longer valid on the backend. The work has already completed or will be retried. This is expected during autoscaling events. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] { trail_point { source_file_loc { filepath: "dist_proc/windmill/client/streaming_rpc_client.cc" line: 767 } } }']``` ```ABORTED: SDK harness sdk-0-0 disconnected. This usually means that the process running the pipeline code has crashed. Inspect the Worker Logs and the Diagnostics tab to determine the cause of the crash. [type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] { trail_point { source_file_loc { filepath: "dist_proc/dax/workflow/worker/fnapi_control_service.cc" line: 217 } } } [dist_proc.dax.MessageCode] { origin_id: 5391582787251181999 [dist_proc.dax.workflow.workflow_io_message_ext]: SDK_DISCONNECT }']``` ```Work item for sharding key 8dd4578b4f280f5d tokens (1316764909133315359, 17766288489530478880) encountered error during processing, will be retried (possibly on another worker): generic::internal: Error encountered with the status channel: SDK harness sdk-0-0 disconnected. with MessageCode: (93f1db2f7a4a325c): SDK disconnect.``` ```Python (worker sdk-0-0_sibling_1) exited 1 times: signal: segmentation fault (core dumped) restarting SDK process``` - We did manage to correlate this with either vertical autoscaling event (when using Prime) or other worker replacements done by Dataflow under the hood, but this is not deterministic. - For few hours watermark progress does stop, but other workers do process messages. - and after few hours: ```Error message from worker: generic::internal: Error encountered with the status channel: There are 10 consecutive failures obtaining SDK worker status info from sdk-0-0. The last success response was received 3h20m2.648304212s ago at 2024-04-23T11:48:35.493682768+00:00. SDK worker appears to be permanently unresponsive. Aborting the SDK. For more information, see: https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact``` - And the pipeline starts to catch up and watermark progresses again. - Job update by Terraform apply also fixes the issue. - We do not see any extensive use of worker memory nor disk. CPU utilization is also most of the time close to idle. I do not think we do use C/C++ code with python. Nor use parallelism/threads outside beam parallelization. Questions: 1. What could be potential causes of such behavior? How to get more insights to this problem? 2. I have seen `In Python pipelines, when shutting down inactive bundle processors, shutdown logic can overaggressively hold the lock, blocking acceptance of new work` in Beam release docs as known issue. What is the status of this? Can this potentially be related? Really appreciate any help, clues or hints how to debug this issue. Best regards Wiśniowski Piotr
Re: [Question] Python Streaming Pipeline Support
Hi, There are two more things that I would suggest to try: 1. PipelineOptions( beam_args, streaming=True, ) The `streaming` flag changes the mode how the runners operate. Not sure why, but I found this required to get similar behavior that you want to get. It might be required even if You do have unbounded sources in the pipeline? I use this successfully on both direct runner and DataFlow. 2. fromapache_beam.testing.test_streamimportElementEvent, TestStream, WatermarkEvent Please check out this classes. This is the most elastic and proper way to test streaming pipelines. Moreover it allows you to control how watermark progresses on source and even maybe processing time triggers (but I did not managed to make this one work yet). I use this heavily to simulate PubSub and Kafka sources and even intermediate pipeline things too. Its kind of poorly documented, as it took me few days to grasp how to handle this with multiple input streams, but I do plan to create a post with findings. I think I had similar days long investigation few months ago. Let me know if this is helpful. Best Wiśniowski Piotr On 9.03.2024 03:29, Puertos tavares, Jose J (Canada) via user wrote: ** *Hello group:* I believe it might be interesting to show what I have found so found with you feedback as I have corroborated that the Direct Runners and Flink Runner DO work on streaming, but it seems more of a constraint on the definition of the PCollection rather than the operators, as show in my code https://play.beam.apache.org/?sdk=python=TJNavCeJ_DS <https://play.beam.apache.org/?sdk=python=TJNavCeJ_DS> Based on the fact that most samples leverage a messaging system as the streaming source I decided to use the Google Cloud PubSub emulator <https://cloud.google.com/pubsub/docs/emulator> to have a setup where I push a message to the topic at the beginning, and create a pipeline that consumes the message from a subscription, applies the windowing, applies the group by operation and at the end it pushes again the message hence providing the forever loop of consumption for streaming *gcloud beta emulators pubsub start --project=beam-sample*** Executing: cmd /c …. cloud-pubsub-emulator.bat --host=localhost --port=8085 [pubsub] This is the Google Pub/Sub fake. [pubsub] Implementation may be incomplete or differ from the real system. [pubsub] Mar 08, 2024 7:46:19 PM com.google.cloud.pubsub.testing.v1.Main main [pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported [pubsub] SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". [pubsub] SLF4J: Defaulting to no-operation (NOP) logger implementation [pubsub] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. [pubsub] Mar 08, 2024 7:46:21 PM com.google.cloud.pubsub.testing.v1.Main main [pubsub] INFO: Server started, listening on 8085 In another terminal then run the setup and the pipeline and run the using emulator documentation <https://cloud.google.com/pubsub/docs/emulator#using_the_emulator> to create the fake topic and subscription using the publisher.py <https://raw.githubusercontent.com/googleapis/python-pubsub/main/samples/snippets/publisher.py> and subscriber.py <https://raw.githubusercontent.com/googleapis/python-pubsub/main/samples/snippets/subscriber.py> scripts *set PUBSUB_EMULATOR_HOST=localhost:8085*** *set PUBSUB_PROJECT_ID=**beam-sample*** *publisher.py beam-sample create topic*** Created topic: projects/beam-sample/topics/topic *subscriber.py beam-sample create topic subscription*** Subscription created: name: "projects/beam-sample/subscriptions/subscription" topic: "projects/beam-sample/topics/topic" push_config { } ack_deadline_seconds: 10 message_retention_duration { seconds: 604800 } *publisher.py beam-sample publish topic*** 1 2 3 4 5 6 7 8 9 Published messages to projects/beam-sample/topics/topic. *Direct Runner*looped in streaming as expected (although in my system it wasn’t every 10 seconds) *eternal_pubsub.py --streaming true*** Starting pipeline... Message number 1 Message number 2 Message number 3 Message number 4 Message number 5 Message number 6 Message number 7 Message number 8 Message number 9 Messages from PubSub :9 Messages from PubSub :1 Messages from PubSub :1 Messages from PubSub :1 … As per this post <https://stackoverflow.com/questions/68342095/error-while-running-beam-streaming-pipeline-python-with-pub-sub-io-in-embedded> FlinkRunner doesn’t support the PubSub operator but then I guess Kafka or other existing Unbound PCollection generator would work, and as I mentioned on my first post the ones that I have created are with the “old I/O Java Source”. To summarize it seems then more than the support of Group By Operations is more towards the Unbounded collections.. I’ll
Updating streaming DataFlow pipeline
Hi I have a question. I do have a long running streaming dataflow jobs done with python sdk. I did notice that when I do update a job with exactly same code that created it, it may randomly fail (~50/50) with error like `The Coder or type for step event_stats/join/dewindow.None/FromValue has changed`. The `dewindow` step is just `WindowInto(GlobalWinsows()` so that downstream state-full `DoFn` would have its state global per key. I am pretty sure that I did not change any types (as this is exact same code) and the coders I use are deterministic (pydantic avro) and types are set for every `PCollection`. What else should I look into, which could result in such behavior? Best Wiśniowski Piotr
Re: [Question] Does Apache SnowflakeIO support Apache Parquet?
Hi Xinmin, After quick look at the connection in java it seems that the csv format was an explicit decision. As this is entirely transparent to user I am just curious why would You like to use parquet instead? What do you want to achieve? Regards, Wiśniowski Piotr On 5.03.2024 05:47, Xinmin wrote: Hello guru, We would like to write data to Snowflake with S3 bucket, basically, the connector uploads the file in the CSV format to S3 bucket before copying data into the table. Please refer to below. image.png Do you have any plans to support Apache Parquet? Thanks. Regards, Xinmin
Re: Roadmap of Calcite support on Beam SQL?
Hi, 1. I do not have up to date knowledge, but Beam sql was missing quite a lot of things regarding Calcite full support. I think the current way is to create a feature request on repository and get votes and interest. I definitely would vote for You initiative ;) 2. Regarding the query itself I got it working for something like this: ``` WITHcte AS( SELECTCAST(event_datetime ASTIMESTAMP) ASts FROMPCOLLECTION ) SELECT CAST(TUMBLE_START(cte.ts, INTERVAL '1'MINUTE) ASVARCHAR) ASstart_time, CAST(TUMBLE_END(cte.ts, INTERVAL '1'MINUTE) ASVARCHAR) ASend_time, COUNT(*) ASpage_views FROMcte GROUP BY TUMBLE(cte.ts, INTERVAL '1'MINUTE) ; ``` Maybe it would be useful for you. Note that I am not up to date with recent versions of Beam SQL, but I will need to catch up (the syntax for defining window on table is cool). Best Wiśniowski Piotr On 4.03.2024 05:27, Jaehyeon Kim wrote: Hello, I just tried a simple tumbling window but failed with the following error RuntimeError: org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse query WITH cte AS ( SELECT TO_TIMESTAMP(event_datetime) AS ts FROM PCOLLECTION ) SELECT CAST(window_start AS VARCHAR) AS start_time, CAST(window_end AS VARCHAR) AS end_time, COUNT(*) AS page_views FROM TABLE( TUMBLE(TABLE cte, DESCRIPTOR(ts), 'INTERVAL 1 MINUTE') ) GROUP BY window_start, window_end I guess it is because TO_TIMESTAMP is not implemented. When I check the document, it misses lots of functions. Is there any roadmap about Calcite support on Beam SQL? Cheers, Jaehyeon
Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error
Hi Evan, Actually did hit similar problem few months ago and finally managed to solve it. My situation is a bit different as I am using Python SDK and DataFlow runner v2 (+streaming engine, prime), and quite a lot of state-full processing. And for my case it did fail with very similar msg but related to some state-full step. The thing is I discovered that update pipeline in place does fail even when submitting exact same code to the pipeline. it seems the problem was that the pipeline graph must be parsed in same order that on the original graph. In my case I had an unordered set of steps to add them to pipeline resulting in the same pipeline graph, but it seems that the ordering of parsing does matter and it fails to update running job if order is different. For my case I just sorted the steps to be added to pipeline by name and updating job on fly started working. So it seems that pipeline state on DataFlow depends somehow on the order in which steps are added to pipeline since some recent versions (as I do recall this was working correctly ~2.50?). Anyone knows if this is intended? If yes would like to know some explanation. Best regards Wiśniowski Piotr On 15.12.2023 00:14, Evan Galpin wrote: The pipeline in question is using Dataflow v1 Runner (Runner v2: Disabled) in case that's an important detail. On Tue, Dec 12, 2023 at 4:22 PM Evan Galpin wrote: I've checked the source code and deployment command for cases of setting experiments. I don't see "enable_custom_pubsub_source" being used at all, no. I also confirmed that it is not active on the existing/running job. On Tue, Dec 12, 2023 at 4:11 PM Reuven Lax via user wrote: Are you setting the enable_custom_pubsub_source experiment by any chance? On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin wrote: Hi all, When attempting to upgrade a running Dataflow pipeline from SDK 2.51.0 to 2.52.0, an incompatibility warning is surfaced that prevents pipeline upgrade: The Coder or type for step .../PubsubUnboundedSource has changed Was there an intentional coder change introduced for PubsubMessage in 2.52.0? I didn't note anything in the release notes https://beam.apache.org/blog/beam-2.52.0/ nor recent changes in PubsubMessageWithAttributesCoder[1]. Specifically the step uses `PubsubMessageWithAttributesCoder` via `PubsubIO.readMessagesWithAttributes()` Thanks! [1] https://github.com/apache/beam/blob/90e79ae373ab38cf4e48e9854c28aaffb0938458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java#L36
Re: [Python SDK] PyArrow Critical Vulnerability
Hi Valentyn, Thank You for information and details. All make sense! I think we can wait for 2.53.0 release and meantime apply hotfix. Best Wiśniowski Piotr On 10.11.2023 20:27, Valentyn Tymofieiev via user wrote: From https://pypi.org/project/pyarrow-hotfix/ : pyarrow_hotfix must be imported in your application or library code for it to take effect. Just installing the package is not sufficient: For Beam users, that means that the pipeline code running on the workers would need to import this module on every worker, for example by adding this line to DoFn.setup or in main session (if pipeline is composed only from one file AND uses dill pickler with --save_main_session flag). We will continue addressing this in https://github.com/apache/beam/issues/29392. On Fri, Nov 10, 2023 at 10:23 AM Valentyn Tymofieiev wrote: Hi Piotr, thanks for bringing this to the list. There is a FR to support pyarrow https://github.com/apache/beam/issues/28410 . I looked into it briefly in https://github.com/apache/beam/pull/28437 but saw some test failures and it has been on back burner. Given the news about vulnerability it would make sense to prioritize this. I think we could decouple this from 2.52.0 release since: 1) there is a workaround 2) new versions of pyarrow haven't been fully tested with Beam 3) Beam 2.52.0 fixes some other issues that are known to affecting users, e.g. https://github.com/apache/beam/issues/28246 From https://securityonline.info/cve-2023-47248-pyarrow-arbitrary-code-execution-vulnerability-a-critical-threat-to-data-analysts/ : > If you cannot upgrade to PyArrow 14.0.1, you can use the pyarrow-hotfix package to disable the vulnerability on older versions of PyArrow. However, this is not a permanent solution, and you should upgrade to PyArrow 14.0.1 as soon as possible. We could consider adding pyarrow-hotfix to the containers for 2.52.0 release. CC: @Danny McCormick <mailto:dannymccorm...@google.com> (release manager). Beam users can also install this additional dependency via one of the ways described in https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/ . On Fri, Nov 10, 2023 at 4:42 AM Wiśniowski Piotr wrote: Hi, Few days ago this one was detected: https://securityonline.info/cve-2023-47248-pyarrow-arbitrary-code-execution-vulnerability-a-critical-threat-to-data-analysts/ I do see that beam 2.51.0 does have `pyarrow<=12.0.0` in requirements. 1. Is there a reason for not allowing newer versions of pyarrow? 2. Is there any planned effort on updating this to `14.0.1`? Is it possible to push the update to `2.52.0` beam release? I know the beam release is almost there. Best Wiśniowski Piotr
Fwd: [Python SDK] PyArrow Critical Vulnerability
Hi, Few days ago this one was detected: https://securityonline.info/cve-2023-47248-pyarrow-arbitrary-code-execution-vulnerability-a-critical-threat-to-data-analysts/ I do see that beam 2.51.0 does have `pyarrow<=12.0.0` in requirements. 1. Is there a reason for not allowing newer versions of pyarrow? 2. Is there any planned effort on updating this to `14.0.1`? Is it possible to push the update to `2.52.0` beam release? I know the beam release is almost there. Best Wiśniowski Piotr
Deployment approach for stateful streaming jobs
Hi, Any typical patterns for deployment of stateful streaming pipelines from Beam? Targeting Dataflow and Python SDK with significant usage of stateful processing with long windows (typically days long). From our current practice of maintaining pipelines we did identify 3 typical scenarios: 1. Deployment without breaking changes (no changes to pipeline graph, coders, states, outputs etc.) - just update DataFlow job in place. 2. Deployment with changes to internal state (changes to coders, state, or even pipeline graph but without changing pipeline input/output schemas) - in this case updating job in place would not work as the state did change and reading state saved by old pipeline would result in error. 3. Deployment with changes to output schema (and potentially to internal state too) - we need to take special care of changing output schema to be sure downstream processes also have a time to switch from old version of data to the new one. To be specific I need some advice/patterns/knowledge on p.2 and p.3. I guess it will require spinning new pipelines with data back-filling or migration jobs? Would really appreciate detailed examples how you are dealing with deploying similar streaming stateful pipelines. Ideally with details on how much data to reprocess to populate internal state, what needs to be done when changing output schema of a pipeline, how to orchestrate all this activities. Best Wisniowski PIotr
Re: UDF/UADF over complex structures
Hi Gyorgy, I guess Your problem might not be directly related to UDFs not UDAFs but to nested structure of Your data. I did have a problem with processing nested data and also did not find a way to work this out - especially if You are not in control of input data structure. This issues seem very related as nested Rows need some polishing: - https://github.com/apache/beam/issues/26911 - https://github.com/apache/beam/issues/27733 I have plans to jump on fix for this two, and I guess it would not be hard, but I do not have any capacity for open source tasks for now. Anyone would like to take a look at them? Best Wiśniowski Piotr On 28.09.2023 19:31, Balogh, György wrote: Sorry I was not specific enough. I ment using the SqlTransform registerUdf and registerUdaf. I use a lot of SQL in my pipeline and I would prefer using SQL UDFs in many cases over writing beam transforms. I already have UDFs but I did not find a way to make them work over nested structures. Thank you, Gyorgy On Thu, Sep 28, 2023 at 5:40 PM Robert Bradshaw via user wrote: Yes, for sure. This is one of the areas Beam excels vs. more simple tools like SQL. You can write arbitrary code to iterate over arbitrary structures in the typical Java/Python/Go/Typescript/Scala/[pick your language] way. In the Beam nomenclature. UDFs correspond to DoFns and UDAFs correspond to CombineFns. On Thu, Sep 28, 2023 at 4:23 AM Balogh, György wrote: Hi, I've complex nested structure in my input data. Is it possible to have UDF/UDAF taking nested structure as input? I'm using java. Outputting nested structure is also a question. Thank you, Gyorgy -- György Balogh CTO E gyorgy.bal...@ultinous.com <mailto:zsolt.sala...@ultinous.com> M +36 30 270 8342 A HU, 1117 Budapest, Budafoki út 209. W www.ultinous.com <http://www.ultinous.com> -- György Balogh CTO E gyorgy.bal...@ultinous.com <mailto:zsolt.sala...@ultinous.com> M +36 30 270 8342 A HU, 1117 Budapest, Budafoki út 209. W www.ultinous.com <http://www.ultinous.com>
Re: Issue with growing state/checkpoint size
Hi, Sorry for the late response - was busy with pretty similar problem. First let me clarify on the pipeline You use first. You have a pipeline with a step produces two stream outputs. They do flow thru some computation independently and then You join them by CoGroupByKey with session window and expect to have match in message id. So You are trying to achieve streaming inner join (which should emit output when same message id arrives on both streams). Is it right? My finding: 1. sessions windows are very tricky to work with CoGroupByKey as they are merging windows (so beam may need to wait arbitrary long accumulating state if one of the streams continuously outputs same message id - this is first thing to check in You case). This is only one of potential corner cases with session windows. But I would highly advice to change this to just plain sliding window on computation time domain as it would allow You to clearly specify how long step should keep waiting for particular message id and how frequently it should emit. 2. There is also a case that one of the streams might not produce a message id for particular message. Double make sure that every message that is processes by You pipelines emits only one message per message id always. You also need to consider what to do with the state once pair of messages is joined and emitted. By default SessionWindows will wait for next messages with same message id accumulating state for some time after. This also might be the source of Your problem. Now the solution that I worked for me in similar problem. 1. window both streams in FixedWidnows(1) on processing time domain. Make sure keys are set. 2. CoGroupByKey 3. window output of CoGroupByKey into GlobalWindows 4. Create state-full ParDoFn with timers (to do cleanup) that captures state if a message arrives on one stream, emits both if both messages arrived, does proper action (up to You) when next messages with same key arrive) and does the cleanup. Explanation; FixedWidnows - make sure that elements are groupped in 1s batches to asses join - this is just to meet requirement from CoGroupByKey to have windows defined on input streams. CoGroupByKey - does join messages that arrive on same second window GlobalWindows - makes sure that state-full ParDoFn keeps state globally per key (as it will manage its state by itself). state-full ParDoFn - does the waiting part for the message and contains all Your logic for the corner cases defined above. above just worked perforce for me. Let me know If its a solution for Your case too. We might want to introduce some common api for such cases (its quite easy to code this in generic form). Best Wiśniowski Piotr On 1.09.2023 15:56, Byron Ellis via user wrote: Depends on why you're using a fan-out approach in the first place. You might actually be better off doing all the work at the same time. On Fri, Sep 1, 2023 at 6:43 AM Ruben Vargas wrote: Ohh I see That makes sense. Wondering if there is an strategy for my use case, where I have an ID unique per pair of messages Thanks for all your help! On Fri, Sep 1, 2023 at 6:51 AM Sachin Mittal wrote: Yes a very high and non deterministic cardinality can make the stored state of join operation unbounded. In my case we know the cardinality and it was not very high so we could go with a lookup based approach using redis to enrich the stream and avoid joins. On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas wrote: Thanks for the reply and the advice One more thing, Do you know if the key-space carnality impacts on this? I'm assuming it is, but the thing is for my case all the messages from the sources has a unique ID, that makes my key-space huge and is not on my control . On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal wrote: So for the smaller size of collection which does not grow with size for certain keys we stored the data in redis and instead of beam join in our DoFn we just did the lookup and got the data we need. On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas wrote: Hello, Thanks for the reply, Any strategy you followed to avoid joins when you rewrite your pipeline? On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal wrote: Yes even we faced the same issue when trying to run a pipeline involving join of two collections. It was deployed using AWS KDA, which uses flink runner. The source was kinesis streams. Looks like join operations are not very
Re: Delete window information
Hi, This is very typical usage. Beam abstraction requires that all events must belong to a window ( and default window is a single global window), so it is not possible to delete a window information. But what You really want is to overwrite the current window information set for the first aggregation with new window information set for second aggregation. This is very typical for Beam pipelines to set new windows per each aggregation operation. In practice I imagine You would need to just |"back to global window" >>WindowInto(GlobalWindows()) before second aggregation operation in case of bounded sources. If You are working with unbounded source (streaming aggregation), using global window directly will fail as it needs to wait for infinite to get all data before emitting output. IN this case You would need to also set some trigger and accumulation_mode to tell beam when to emit elements for the aggregated keys. Hope this helps. If not please provide a bit more details what exactly You are trying to do so that I can try to help. Also You could try to get more familiar with the abstraction with the help of https://tour.beam.apache.org/. Best Wiśniowski Piotr On 1.08.2023 09:17, Guagliardo, Patrizio via user wrote: Hi together, I have a question regarding Apache Beam in Python: When I create a window with timestamps and then make a groupby, then the information for the windows remains in the elements. Afterwards, I want to make another groupby (something like a cumsum) by certain keys, but that does not work as the windows are still there and so it takes the keys + windows to split my data in the combineFn. Question now: How can I “delete” the windows and timestamps so that I would combine records just by the defined keys in the last combineFn. Hope this is clear. Best, Patrizio This e-mail and any attachments may be confidential or legally privileged. If you received this message in error or are not the intended recipient, you should destroy the e-mail message and any attachments or copies, and you are prohibited from retaining, distributing, disclosing or using any information contained herein. Please inform us of the erroneous delivery by return e-mail. Thank you for your cooperation. For more information on how we use your personal data please see our Privacy Notice <https://www.oliverwyman.com/policies/privacy-notice.html>.
Re: Beam SQL found limitations
Hi Kenn, Thanks for clarification. 1. Just to put an example in front - for every event that comes in I need to find corresponding previous event of same user_id and pass previous_event_timestamp in the current event payload down (and also current event becomes previous event for future events that come in for same user). Question is how to do it with BeamSQL. I am aware that analytic windowing (like last_value over etc.) might not be a way for streaming and I am ok with this - it make sense under the hood just as You mention. The task is to be able to keep a simple state in streaming SQL. What I did come up with is using sliding window to have this state available for each new event that comes in. ``` WITH unbounded_stream_initialized AS ( SELECT user_id, event_time FROM unbounded_stream GROUP BY user_id, event_time, TUMBLE(event_time,INTERVAL '1' SECONDS) UNION ALL -- this is needed as first session window by default starts at first element, while here we need to start it in the past -- so that there is a window that ends just after first real element SELECT CAST(0 AS BIGINT) AS user_id, CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not allow to have GROUP BY just after SELECT GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL '1' SECONDS) ), test_data_1 AS ( SELECT user_id, MAX(event_time) AS prev_event_time, HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS window_end_at FROM unbounded_stream_initialized GROUP BY user_id, HOP( -- first create a sliding window to aggregate state event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS -- The idea is to have this quite long compared to interval ) ), test_data_1_lookup AS ( SELECT user_id, prev_event_time FROM test_data_1 GROUP BY user_id, -- then re-window into windows suitable for joining main stream TUMBLE(window_end_at, INTERVAL '1' SECONDS) ), enriched_info AS ( SELECT unbounded_stream_initialized.event_timestamp AS event_timestamp, unbounded_stream_initialized.user_id AS user_id, test_data_1_lookup.prev_event_time AS prev_event_time FROM unbounded_stream_initialized LEFT JOIN test_data_1_lookup ON unbounded_stream_initialized.user_id = test_data_1_lookup.user_id ) SELECT * FROM enriched_info ``` The doubt that I have is whether above will not store too much redundant data as `test_data_1` suggests it could duplicate and store each incoming msg into all windows there are in the sliding window definition (might be a lot in this case). Or actually resolving if a message belongs to a window is done later when evaluating `LEFT JOIN`? Target DataFlow. I am still learning Beam so there might be some core thing that I miss to understand how it is processed. 2. Any hints on implementing FirestoreIOTableProvider? just more or less how to do it where to look for important parts etc. It seems we would need this functionality. 3. I will try to report some more interesting findings. If possible please prioritize fixing this ROW error. Best Piotr On 26.05.2023 21:36, Kenneth Knowles wrote: Just want to clarify that Beam's concept of windowing is really an event-time based key, and they are all processed logically simultaneously. SQL's concept of windowing function is to sort rows and process them linearly. They are actually totally different. From your queries it seems you are interested in SQL's windowing functions (aka analytic functions). I am surprised by the problems with rows, since we have used them extensively. Hopefully it is not too hard to fix. Same with the UNION ALL problem. And for the CROSS JOIN it would be a nice feature to allow in some cases it seems. Should not be hard. Thank you for reporting this! If you have time it would be really great to get each of these reproducible problems into GitHub issues, each. Kenn On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr wrote: Hi Alexey, Thank You for reference to that discussion I do actually have pretty similar thoughts on what Beam SQL needs. Update from my side: Actually did find a workaround for issue with windowing function on stream. It basically boils down to using sliding window to collect and aggregate the state. But would need an advice if this is actually a cost efficient method (targeting DataFlow runner). The doubt that I have is that this sliding window would need to have sliding interval less than 1s and size more than a week and be feed with quire frequent data. If I do understand this correctly - it would mean each input row would need to be duplicated for each window
Re: Beam SQL found limitations
Hi Alexey, Thank You for reference to that discussion I do actually have pretty similar thoughts on what Beam SQL needs. Update from my side: Actually did find a workaround for issue with windowing function on stream. It basically boils down to using sliding window to collect and aggregate the state. But would need an advice if this is actually a cost efficient method (targeting DataFlow runner). The doubt that I have is that this sliding window would need to have sliding interval less than 1s and size more than a week and be feed with quire frequent data. If I do understand this correctly - it would mean each input row would need to be duplicated for each window and stored which could be quite significant storage cost? Or actually Beam does not physically duplicate the record but just tracks to which windows the record currently belongs? And the real issue that BeamSQL needs at the moment in my opinion is fixing bugs. Some bugs that I found that prevent one from using it and would really appreciate fast fix: - UNNEST ARRAY with a nested ROW (described below, created ticket - https://github.com/apache/beam/issues/26911) - PubSub table provider actually requires all table properties to be there (with null in `timestampAttributeKey` it fails) - which essentially does not allow one to use pubsub publish timestamp as `timestampAttributeKey`. - its not possible to cast VARCHAR to BYTES. And BYTES is needed for DataStoreV1TableProvider to provide a key for storage. Also consider updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it requires VARCHAR instead of BYTES - its even easier in implementation. - Any hints on how to implement `FireStoreIOTableProvider`? I am considering implementing it and contributing depending on my team decision - but would like to get like idea how hard this task is. Will create tickets for the rest of issues when I will have some spare time. Best regards Wiśniowski Piotr On 22.05.2023 18:28, Alexey Romanenko wrote: Hi Piotr, Thanks for details! I cross-post this to dev@ as well since, I guess, people there can provide more insights on this. A while ago, I faced the similar issues trying to run Beam SQL against TPC-DS benchmark. We had a discussion around that [1], please, take a look since it can be helpful. [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b — Alexey On 18 May 2023, at 11:36, Wiśniowski Piotr wrote: HI, After experimenting with Beam SQL I did find some limitations. Testing on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner and openjdk version "11.0.19". Please let me know if some of them are known/ worked on/ have tickets or have estimated fix time. I believe most of them are low hanging fruits or just my thinking is not right for the problem. If this is the case please guide me to some working solution. From my perspective it is ok to have a fix just on master - no need to wait for release. Priority order: - 7. Windowing function on a stream - in detail - How to get previous message for a key? setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly not wait for the big window to finish and emit the expired keys. Ideally would like to do it in pure beam pipeline as saving to some external key/value store and then reading this here could potentially result in some race conditions which in I would like to avoid, but if its the only option - let it be. - 5. single UNION ALL possible - 4. UNNEST ARRAY with nested ROW - 3. Using * when there is Row type present in the schema - 1. `CROSS JOIN` between two unrelated tables is not supported - even if one is a static number table - 2. ROW construction not supported. It is not possible to nest data Below queries tat I use to testing this scenarios. Thank You for looking at this topics! Best Wiśniowski Piotr --- -- 1. `CROSS JOIN` between two unrelated tables is not supported. --- -- Only supported is `CROSS JOIN UNNEST` when exploding array from same table. -- It is not possible to number rows WITHdata_table AS( SELECT1ASa ), number_table AS( SELECT numbers_exploded ASnumber_item FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) ASnumbers_exploded ) SELECT data_table.a, number_table.number_item FROMdata_table CROSS JOINnumber_table ; -- CROSS JOIN, JOIN ON FALSE is not supported! --- -- 2. ROW construction not supported. It is not possible to nest data --- SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0 SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0 SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same type SELECTMAP['field1','b','field2','a']; -- null -- WORKAROUND - manually compose json string, -- drawback - decomposing might be not supported or
Beam SQL found limitations
HI, After experimenting with Beam SQL I did find some limitations. Testing on near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner and openjdk version "11.0.19". Please let me know if some of them are known/ worked on/ have tickets or have estimated fix time. I believe most of them are low hanging fruits or just my thinking is not right for the problem. If this is the case please guide me to some working solution. From my perspective it is ok to have a fix just on master - no need to wait for release. Priority order: - 7. Windowing function on a stream - in detail - How to get previous message for a key? setting expiration arbitrary big is ok, but access to the previous record must happen fairly quickly not wait for the big window to finish and emit the expired keys. Ideally would like to do it in pure beam pipeline as saving to some external key/value store and then reading this here could potentially result in some race conditions which in I would like to avoid, but if its the only option - let it be. - 5. single UNION ALL possible - 4. UNNEST ARRAY with nested ROW - 3. Using * when there is Row type present in the schema - 1. `CROSS JOIN` between two unrelated tables is not supported - even if one is a static number table - 2. ROW construction not supported. It is not possible to nest data Below queries tat I use to testing this scenarios. Thank You for looking at this topics! Best Wiśniowski Piotr --- -- 1. `CROSS JOIN` between two unrelated tables is not supported. --- -- Only supported is `CROSS JOIN UNNEST` when exploding array from same table. -- It is not possible to number rows WITHdata_table AS( SELECT1ASa ), number_table AS( SELECT numbers_exploded ASnumber_item FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) ASnumbers_exploded ) SELECT data_table.a, number_table.number_item FROMdata_table CROSS JOINnumber_table ; -- CROSS JOIN, JOIN ON FALSE is not supported! --- -- 2. ROW construction not supported. It is not possible to nest data --- SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0 SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0 SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same type SELECTMAP['field1','b','field2','a']; -- null -- WORKAROUND - manually compose json string, -- drawback - decomposing might be not supported or would need to be also based on string operations SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`; --- -- 3. Using * when there is Row type present in the schema --- CREATEEXTERNALTABLEtest_tmp_1( `ref`VARCHAR, `author`ROW< `name`VARCHAR, `email`VARCHAR ) TYPEtext LOCATION'python/dbt/tests/using_star_limitation.jsonl' TBLPROPERTIES '{"format":"json", "deadLetterFile":"top/python/dbt/tests/dead"}'; SELECT*FROMtest_tmp_1; -- java.lang.NoSuchFieldException: name -- WORKAROUND - refer to columns explicitly with alias SELECT `ref`ASref_value, test_tmp_1.`author`.`name`ASauthor_name, -- table name must be referenced explicitly - this could be fixed too test_tmp_1.`author`.`email`ASauthor_name FROMtest_tmp_1; --- -- 4. UNNEST ARRAY with nested ROW --- CREATEEXTERNALTABLEtest_tmp( `ref`VARCHAR, `commits`ARRAY ) TYPEtext LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl' TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}'; SELECT test_tmp.`ref`ASbranch_name, commit_item.`id`AScommit_hash, commit_item.`author`.`name`ASauthor_name FROMtest_tmp CROSS JOINUNNEST(test_tmp.commits) AScommit_item; -- Row expected 4 fields (Field{name=ref, description=, type=STRING, options={{}}}, Field{name=commits, description=, type=ARRAYSTRING, author ROW> NOT NULL>, options={{}}}, Field{name=id, description=, type=STRING, options={{}}}, Field{name=author, description=, type=ROW, options={{}}}). initialized with 5 fields. -- limited WORKAROUND - refer to array elements by index and UNION ALL the items into rows -- note workaround that uses number table will not work as CROSS JOIN is not supported WITHdata_parsed AS( SELECT test_tmp.`ref`ASbranch_id, test_tmp.commits[1].`id`AScommit_hash, test_tmp.commits[1].`author`.`name`ASauthor_name FROMtest_tmp UNION ALL-- this unfortunately works only for two indexes SELECT test_tmp.`ref`ASbranch_id, test_tmp.commits[2].`id`AScommit_hash, test_tmp.commits[2].`author`.`name`ASauthor_name FROMtest_tmp ) SELECT* FROMdata_parsed WHEREauthor_name IS NOT NULL ; -- better WORKAROUND - but tricky to get right (fragile) WITHdata_with_number_array AS( SELECT test_tmp.`ref`ASbranch_name, -- there must be some primary key in the data to join on later due t
Re: Can I batch data when i use JDBC write operation?
Hi, Not tested, but few options that might be a solutions for You problem: 1. go with having read and write replicas of Your DB - so that write replica would get inserts one by one and live with this. Make sure to deduplicate the data before insert to avoid potential collisions (this should not be a problem, but I am not sure how the subsystem would behave) 2. - Add a step to group the input data into a time window - then ingest the events from a window to a unique temp table (just by plain `INSERT INTO`) - then add next step in pipeline to trigger merge operation from tmp table to You production table. Make sure same connection session is used or the tmp table will be gone. Also not sure if it is possible to invoke only one command per window by using `WriteToJdbc` operator after previous write finishes. But this is up for your experimentation/ anyone with more experience has some knowledge how to code it? 3. Another option is to - aggregate events into a window - so that only one element would be emited a window (array of events) - try somehow to upt this record in statement as a single row - in subsequent CTEs deserialize the array into multiple rows - do insert with update. So the sql on the DB side would look something like: ``` |WITH new_values (arr) as ( values (?) ), deser AS ( SELECT explode(arr) FROM new_values ), ||upsert as ( update mytable m set field1 = nv.field1, field2 = nv.field2 FROM |||deser| nv WHERE m.id = nv.id RETURNING m.* ) INSERT INTO mytable (id, field1, field2) SELECT id, field1, field2 FROM |||deser| WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE up.id = new_values.id)| ``` Above is just pseudocode that I did not test, but it could be a hint for You. Also great answer on this one here: https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/8702291#8702291 4. some mix of p.2 and p.3 Hopefully this helps You in breaking the problem. Best regards Wiśniowski Piotr On 21.04.2023 01:34, Juan Romero wrote: Hi. Can someone help me with this? El mié, 19 abr 2023 a las 15:08, Juan Romero () escribió: Hi community. On this occasion I have a doubt regarding how to read a stream from kafka and write batches of data with the jdbc connector. The idea is to override a specific row if the current row we want to insert into has the same id and the load_date_time is greater. The conceptual pipeline look like this and it is working (Take in mind that the source will be a streaming from kafka): ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, load_date_time=str) with beam.Pipeline()as p: _ = ( p | beam.Create( [ ExampleRow(1, '', '2023-04-05 12:34:56'), ExampleRow(1, 'yyyz', '2023-04-05 12:34:55') ]).with_output_types(ExampleRow) |'Write to jdbc' >> WriteToJdbc( driver_class_name='org.postgresql.Driver', jdbc_url='jdbc:postgresql://localhost:5432/postgres', username='postgres', password='postgres', table_name='test', connection_properties="stringtype=unspecified", statement='INSERT INTO test \ VALUES(?,?,?) \ ON CONFLICT (id)\ DO UPDATE SET name = EXCLUDED.name, load_date_time = EXCLUDED.load_date_time\ WHERE EXCLUDED.load_date_time::timestamp > test.load_date_time::timestamp', )) My question is if I want to write a stream that comes from kafka how can how can avoid the jdbc connector inserting the register one by one statement and rather insert the data in based time batches. Probably internally jdbc has some kind of "intelligence for do this" but i want to know what do you think about it . Thank you!
Re: [java] Trouble with gradle and using ParquetIO
Hi Evan, Just to have full knowledge: - "provided" should be used when You expect the target cluster on environment to have the package of interest installed so you do not have to include it in the pipeline jar (this is to have it more lightweight and easier to maintain coherent target jre env across organization). - it seems that You should either install the library on You target env or include it in your build jar. Up to Your specific use case. Typically corporation envs provide commonly used libs in their envs like spark, and IO libs - and this might be the reason that maven suggest this. Best Wiśniowski Piotr On 21.04.2023 08:30, Moritz Mack wrote: Hi Evan, Not sure why maven suggests using “compileOnly”. That’s certainly wrong, make sure to use “implementation” in your case. Cheers, Moritz On 21.04.23, 01:52, "Evan Galpin" wrote: Hi all, I'm trying to make use of ParquetIO. Based on what's documented in maven central, I'm including the artifact in "compileOnly" mode (or in maven parlance, 'provided' scope). I can successfully compile Hi all, I'm trying to make use of ParquetIO. Based on what's documented in maven central, I'm including the artifact in "compileOnly" mode (or in maven parlance, 'provided' scope). I can successfully compile my pipeline, but when I run it I (intuitively?) am met with a ClassNotFound exception for ParquetIO. Is 'compileOnly' still the desired way to include ParquetIO as a pipeline dependency? Thanks, Evan *As a recipient of an email from the Talend Group, your personal data will be processed by our systems. Please see our Privacy Notice <https://www.talend.com/privacy-policy/>*for more information about our collection and use of your personal information, our security practices, and your data protection rights, including any rights you may have to object to automated-decision making or profiling we use to analyze support or marketing related communications. To manage or discontinue promotional communications, use the communication preferences portal <https://info.talend.com/emailpreferencesen.html>. To exercise your data protection rights, use the privacy request form <https://talend.my.onetrust.com/webform/ef906c5a-de41-4ea0-ba73-96c079cdd15a/b191c71d-f3cb-4a42-9815-0c3ca021704cl>. Contact us here <https://www.talend.com/contact/>or by mail to either of our co-headquarters: Talend, Inc.: 400 South El Camino Real, Ste 1400, San Mateo, CA 94402; Talend SAS: 5/7 rue Salomon De Rothschild, 92150 Suresnes, France
Re: Beam shell sql with zeta
Thank You for the reply and a hint. 1. Yes did try with Calcite `ROW` too - `java.lang.NoSuchFieldException: head (state=,code=0)` but on the transformation side `SELECT * FROM etl_raw LIMIT 1`. Maybe I need to directly refer to a field that I need instead of using `*`? Do You know from top of your head whats the syntax to get `repo_state.head.commit` value? Did try also to add more fields to the table def but no luck. 2. > doesn't actually do anything on the SQL shell at query parse time This is also my observation. What is the proper way to initialize Zeta? Did try `./bin/shell --plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner` and few other variations but seems like sqlline does not allow for passing parameters to pipeline options. I do not have any requirement specific for Zeta but it seems it has a bit more support for JSON functions than Calcite. 3. Any other idea to workaround this issue? Thanks for support! For context I do try to setup some POC with this SQLs as it could really speed up development and maintenance of our pipelines + SQL has low requirements regarding knowledge to actually understand the logic (read the code). If this works it might be a great (killer) feature for Beam. Best Wisniowski Piotr On 20.04.2023 20:52, Andrew Pilloud via user wrote: set plannerName doesn't actually do anything on the SQL shell at query parse time, it will still use the calcite parser. Have you tried calcite SQL? Support for struts is somewhat limited. I know there are bugs around nested structs and structs with single values. Andrew On Thu, Apr 20, 2023 at 9:26 AM Wiśniowski Piotr wrote: Hi, I have a question regarding usage of Zeta with SQL extensions in SQL shell. I try to: ``` SET runner = DirectRunner; SET tempLocation = `/tmp/test/`; SET streaming=`True`; SET plannerName = `org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`; CREATE EXTERNAL TABLE etl_raw( event_timestamp TIMESTAMP, event_type VARCHAR, message_id VARCHAR, tracking_source VARCHAR, tracking_version VARCHAR, `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>> ) TYPE pubsub LOCATION 'projects/xxx/topics/xxx' TBLPROPERTIES '{"format":"json"}'; ``` But get error `parse failed: Encountered "STRUCT" `. If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but still I do fail to receive data on `SELECT * FROM etl_raw LIMIT 1;` with exception of `java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure that the field is there in json payload. With commented out `repo_state` filed I am able to retrieve the data. Unfortunately I do not have control over the payload structure as its 3rd party hook to make it flat. In general I am unable to parse json msg from pubsub having structured field. Is anyone familiar with this part of Beam functionalities? Best regards Wisniowski Piotr
Beam shell sql with zeta
Hi, I have a question regarding usage of Zeta with SQL extensions in SQL shell. I try to: ``` SET runner = DirectRunner; SET tempLocation = `/tmp/test/`; SET streaming=`True`; SET plannerName = `org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`; CREATE EXTERNAL TABLE etl_raw( event_timestamp TIMESTAMP, event_type VARCHAR, message_id VARCHAR, tracking_source VARCHAR, tracking_version VARCHAR, `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>> ) TYPE pubsub LOCATION 'projects/xxx/topics/xxx' TBLPROPERTIES '{"format":"json"}'; ``` But get error `parse failed: Encountered "STRUCT" `. If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but still I do fail to receive data on `SELECT * FROM etl_raw LIMIT 1;` with exception of `java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure that the field is there in json payload. With commented out `repo_state` filed I am able to retrieve the data. Unfortunately I do not have control over the payload structure as its 3rd party hook to make it flat. In general I am unable to parse json msg from pubsub having structured field. Is anyone familiar with this part of Beam functionalities? Best regards Wisniowski Piotr
Beam shell sql with zeta
Hi, I have a question regarding usage of Zeta with SQL extensions in SQL shell. I try to: ``` SET runner = DirectRunner; SET tempLocation = `/tmp/test/`; SET streaming=`True`; SET plannerName = `org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`; CREATE EXTERNAL TABLE etl_raw( event_timestamp TIMESTAMP, event_type VARCHAR, message_id VARCHAR, tracking_source VARCHAR, tracking_version VARCHAR, `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>> ) TYPE pubsub LOCATION 'projects/xxx/topics/xxx' TBLPROPERTIES '{"format":"json"}'; ``` But get error `parse failed: Encountered "STRUCT" `. If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but still I do fail to receive data on `SELECT * FROM etl_raw LIMIT 1;` with exception of `java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure that the field is there in json payload. With commented out `repo_state` filed I am able to retrieve the data. Unfortunately I do not have control over the payload structure as its 3rd party hook to make it flat. In general I am unable to parse json msg from pubsub having structured field. Is anyone familiar with this part of Beam functionalities? Best regards Wisniowski Piotr
Re: Backup event from Kafka to S3 in parquet format every minute
Hi Alexey, I am just learning Beam and doing POC that requires fetching stream data from PubSub and partitioning it on gs as parquet files with constant window. The thing is I have additional requirement to use ONLY SQL. I did not manage to do it. My solutions either worked indefinitely or failed with `GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger` despite having window definition in the exact same CTE. Exactly what I tried You can find here: https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph Here my knowledge of Beam ends. My hypothesis: is that `WriteToParquet` supports only bounded data and does not automatically partition data. So in OP could use it by batching the data in memory (into individual windows) and then applying `WriteToParquet` to each collected batch individually. But this is more like guess than knowledge. Please let me know if this is not correct. On my solution I cannot test it as I am limited only to pure SQL, where I can only play with a table definition. But did not see any table parameters that could be responsible for partitioning. If there are please let me know. If You remember that It is possible to read or write to partitioned Parquet files as just `PTransform` that's great! I probably must have made some minor mistake in my trials. But eager to learn what was the mistake. Best regards Wiśniowski Piotr I did find solution like this one: https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java. This probably could help OP since OP tried to save to lvl `PCollection` as `Parquet` instead of saving each partition separately like stated in `WriteOneFilePer On 17.02.2023 18:38, Alexey Romanenko wrote: Piotr, On 17 Feb 2023, at 09:48, Wiśniowski Piotr wrote: Does this mean that Parquet IO does not support partitioning, and we need to do some workarounds? Like explicitly mapping each window to a separate Parquet file? Could you elaborate a bit more on this? IIRC, we used to read partitioned Parquet files with ParquetIO while running TPC-DS benchmark. — Alexey
Re: Backup event from Kafka to S3 in parquet format every minute
Hi, Sounds like exact problem that I have few emails before - https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph Does this mean that Parquet IO does not support partitioning, and we need to do some workarounds? Like explicitly mapping each window to a separate Parquet file? This could be a solution in Your case, if it works (just idea worth trying but did not test it and do not have enough experience with Beam), but I am limited only to pure SQL and not sure how I can do it. Hope This helps with Your problem and Beam support could find some solution to my case too. Best Wiśniowski Piotr On 17.02.2023 02:00, Lydian wrote: I want to make a simple Beam pipeline which will store the events from kafka to S3 in parquet format every minute. Here's a simplified version of my pipeline: |def add_timestamp(event: Any) -> Any: from datetime import datetime from apache_beam import window return window.TimestampedValue(event, datetime.timestamp(event[1].timestamp)) # Actual Pipeline ( pipeline | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, with_metadata=False) | "Transformed" >> beam.Map(my_transform) | "Add timestamp" >> beam.Map(add_timestamp) | "window" >> beam.WindowInto(window.FixedWindows(60)) # 1 mins | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema) ) | However, the pipeline failed with |GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger | This seems to be coming from https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which always add a |GlobalWindows| and thus causing this error. Wondering what I should do to correctly backup the event from Kafka (Unbounded) to S3. Thanks! btw, I am running with |portableRunner| with Flink. Beam Version is 2.41.0 (the latest version seems to have the same code) and Flink version is 1.14.5 Sincerely, Lydian Lee
Fwd: Beam SQL Extensions Windowing question/problem
Hi, I am quite new to Calcite or ZetaSQL SQL dialects, but I did read all the doc I could find and could not make below code work. Any ideas what I could be doing wrongly? (Beam 2.44 or latest main builds behave the same, DirectRunner) ``` CREATE EXTERNAL TABLE pub_sub_example( event_timestamp TIMESTAMP, `type` VARCHAR, `value` INTEGER ) TYPE pubsub LOCATION 'projects/some-project/topics/etl-raw-user-behaviour' TBLPROPERTIES '{"format":"json"}'; ``` Below properly returns results: ```SELECT * FROM pub_sub_example LIMIT 2;``` But below ones works indefinitely ``` SELECT `type` AS type_kind, TUMBLE_START(event_timestamp, INTERVAL '1' SECOND) AS win, COUNT(*) AS count_of_messages FROM pub_sub_example GROUP BY TUMBLE(event_timestamp, INTERVAL '1' SECOND), `type` LIMIT 2; ``` Moreover when I do: ``` CREATE EXTERNAL TABLE pub_sub_hist ( type_kind VARCHAR, win TIMESTAMP, count_of_messages INTEGER ) TYPE parquet LOCATION 'gs://etl-user-behaviour/output' TBLPROPERTIES '{"file_name_suffix":".parquet", "shard_name_template":"/S_of_N", "mime_type":"application/vnd.apache.parquet"}' ; INSERT INTO SELECT `type` AS type_kind, TUMBLE_START(event_timestamp, INTERVAL '1' SECOND) AS win, COUNT(*) AS count_of_messages FROM pub_sub_example GROUP BY TUMBLE(event_timestamp, INTERVAL '1' SECOND), `type`; ``` I do get error that I cannot understand: groupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. (state=,code=0) I think that I am providing a window as `TUMBLE(event_timestamp, INTERVAL '1' SECOND)` in the exact `GROUP BY` step but it still threats the table as in `GlobalWindow`? Moreover I did find in Calcite SQL docs that this kind of windowing is depreciated: https://calcite.apache.org/docs/reference.html#grouped-window-functions and even DataFlow SQL examples do use new syntax (https://cloud.google.com/dataflow/docs/reference/sql/streaming-extensions) that is present also in Calcite (https://calcite.apache.org/docs/reference.html#tumble) but Beam SQL does not parse it. Any idea what is the roadmap for this functionalities? Really appreciate quick reply. Best regards Wiśniowski Piotr