Re: How windowing is implemented on Flink runner

2024-06-14 Thread Wiśniowski Piotr

Hi,

Wanted to follow up as I did have similar case.

So this means it is ok for Beam to use Sliding window of 1 day with 1 
sec period (with using different trigger than after watermark to avoid 
outputting data from every window) and there is no additional 
performance penalty (duplicating input messages for storage or cpu for 
resolving windows)? Interesting from both Flink and Dataflow perspective 
(both Python and Java).


I ended up implementing the logic with Beam state and timers (which is 
quite performant and readable) but also interested in other possibilities.


Best

Wiśniowski Piort

On 12.06.2024 21:50, Ruben Vargas wrote:

I imagined it but wasn't sure!

Thanks for the clarification!

On Wed, Jun 12, 2024 at 1:42 PM Robert Bradshaw via user
 wrote:

Beam implements Windowing itself (via state and timers) rather than
deferring to Flink's implementation.

On Wed, Jun 12, 2024 at 11:55 AM Ruben Vargas  wrote:

Hello guys

May be a silly question,

But in the Flink runner, the window implementation uses the Flink
windowing? Does that mean the runner will have performance issues like
Flink itself? see this:
https://issues.apache.org/jira/browse/FLINK-7001

I'm asking because I see the issue, it mentions different concepts
that Beam already handles at the API level. So my suspicion is that
the Beam model handles windowing a little differently from the pure
Flink app. But I'm not sure..


Regards.


Re: Watermark progress halt in Python streaming pipelines

2024-04-30 Thread Wiśniowski Piotr

Hi,

Getting back with update. After updating `grpcio` the issues are gone. 
Thank you for the solution and investigation. Feels like I own you a 
beer :)


Best

Wiśniowski Piotr

On 24.04.2024 22:11, Valentyn Tymofieiev wrote:



On Wed, Apr 24, 2024 at 12:40 PM Wiśniowski Piotr 
 wrote:


Hi!

Thank you for the hint. We will try with the mitigation from the
issue. We did already tried everything from

https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact
, but lets hope upgrading the dependency will help. Will keep
reply to this thread once I get confirmation.

BTW great job on the investigation of bug that you mentioned.
Impressive. Seems like a nasty one.

Thanks. I was specifically recommending you check the recently added 
content under "It might be possible to retrieve stacktraces of a 
thread that is holding the GIL on a running Dataflow worker as 
follows:", as that should help find out what is causing stuckness in 
your case. But hopefully it won't be necessary after you adjust the 
grpcio version.


Best,

Wiśniowski Piotr

On 24.04.2024 00:31, Valentyn Tymofieiev via user wrote:

You might be running into
https://github.com/apache/beam/issues/30867.

Among the error messages you mentioned, the  following is closer
to rootcause: ``Error message from worker: generic::internal:
Error encountered with the status channel: There are 10
consecutive failures obtaining SDK worker status info from
sdk-0-0. The last success response was received 3h20m2.648304212s
ago at 2024-04-23T11:48:35.493682768+00:00. SDK worker appears to
be permanently unresponsive. Aborting the SDK. For more
information, see:

https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```

<https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact>

If mitigations in https://github.com/apache/beam/issues/30867
don't resolve your issue, please see

https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact
for insturctions on how to find what causes the workers to be stuck.

Thanks!

On Tue, Apr 23, 2024 at 12:17 PM Wiśniowski Piotr
 wrote:

Hi,
We are investigating an issue with our Python SDK streaming
pipelines, and have few questions, but first context.
Our stack:
- Python SDK 2.54.0 but we tried also 2.55.1
- DataFlow Streaming engine with sdk in container image (we
tried also Prime)
- Currently our pipelines do have low enough traffic, so that
single node handles it most of the time, but occasionally we
do scale up.
- Deployment by Terraform `google_dataflow_flex_template_job`
resource, which normally does job update when re-applying
Terraform.
- We do use a lot `ReadModifyWriteStateSpec`, other states
and watermark timers, but we do keep a the size of state
under control.
- We do use custom coders as Pydantic avro.
The issue:
- Occasionally watermark progression stops. The issue is not
deterministic, and happens like 1-2 per day for few pipelines.
- No user code errors reported- but we do get errors like this:
```INTERNAL: The work item requesting state read is no longer
valid on the backend. The work has already completed or will
be retried. This is expected during autoscaling events.

[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]

<http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
{ trail_point { source_file_loc { filepath:
"dist_proc/windmill/client/streaming_rpc_client.cc" line: 767
} } }']```
```ABORTED: SDK harness sdk-0-0 disconnected. This usually
means that the process running the pipeline code has crashed.
Inspect the Worker Logs and the Diagnostics tab to determine
the cause of the crash.

[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]

<http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
{ trail_point { source_file_loc { filepath:
"dist_proc/dax/workflow/worker/fnapi_control_service.cc"
line: 217 } } } [dist_proc.dax.MessageCode] { origin_id:
5391582787251181999
[dist_proc.dax.workflow.workflow_io_message_ext]:
SDK_DISCONNECT }']```
```Work item for sharding key 8dd4578b4f280f5d tokens
(1316764909133315359, 17766288489530478880) encountered error
during processing, will be retried (possibly on another
worker): generic::internal: Error encountered with the status
channel: SDK harness sdk-0-0 disconnected. with MessageCode:
(93f1db2f7a4a325

Re: Hot update in dataflow without lossing messages

2024-04-28 Thread Wiśniowski Piotr

HI,

I have pretty same setup. Regarding Terraform and DataFlow on GCP:

- Terraform apply does check if there is a DataFlow job running with 
same `job_name`


- if there is not - it does create a new one and waits till its in 
"running" state


- if there is one already - it does try to update the job, which means 
create a new job with same "job_name" which will be running the new 
version of the code, and send "update" signal to the old one. After 
that, old job halts and waits for the new one to fully start and 
transmit the state of the old job. Once that's done the old job goes 
into "updated" state, and new one does process messages. If the new one 
fails the old one resumes processing.


- Note for this to work the new code requires to be compatible with the 
old one. If its not, the new job will fail, and old job will get 
slightly behind as it needed to wait for the new job to fail.


- Note 2: there is a way to run verify compatibility so that the new job 
will not start, but there will be a check to make sure it is compatible 
with the new job, hence avoiding possible delays in the old job.


- Note 3: there is entirely separate job update type called "in-flight 
update" which does not effectively change the job, but allows to change 
autoscaller parameters (like max number of workers) without creating any 
delays in the pipeline.


Given above context, to fully diagnose your issue, more information is 
needed, but you might be hitting the issue mentioned by Robert:


- if you use a topic for PubSubIO, this will mean that each new job does 
create a new subscription on the topic on graph construction time. So 
this means if there are messages in the old subscription that were not 
yet processed (and acked) by the old pipeline, and the old pipeline gets 
"update" signal and halts, there might be some time duration when 
messages can be published to the old subscription and not published to 
the new one.


Workarounds:

- use subscription on PubSubIO or

- use random job names on TF and drain old pipelines.

Note all above is just hypothesis, but hopefully it might be helpful.

Best

Wiśniowski Piotr


On 16.04.2024 05:15, Juan Romero wrote:
The deployment in the job is made by terraform. I verified and seems 
that terraform do it incorrectly under the hood because it stop the 
current job and starts and new one. Thanks for the information !


On Mon, 15 Apr 2024 at 6:42 PM Robert Bradshaw via user 
 wrote:


Are you draining[1] your pipeline or simply canceling it and
starting a new one? Draining should close open windows and attempt
to flush all in-flight data before shutting down. For PubSub you
may also need to read from subscriptions rather than topics to
ensure messages are processed by either one or the other.

[1]
https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain

On Mon, Apr 15, 2024 at 9:33 AM Juan Romero  wrote:

Hi guys. Good morning.

I haven't done some test in apache beam over data flow in
order to see if i can do an hot update or hot swap meanwhile
the pipeline is processing a bunch of messages that fall in a
time window of 10 minutes. What I saw is that when I do a hot
update over the pipeline and currently there are some messages
in the time window (before sending them to the target), the
current job is shutdown and dataflow creates a new one. The
problem is that it seems that I am losing the messages that
were being processed in the old one and they are not taken by
the new one, which imply we are incurring in losing data .

Can you help me or recommend any strategy to me?

Thanks!!


Re: Any recomendation for key for GroupIntoBatches

2024-04-28 Thread Wiśniowski Piotr

Hi,

Might be late to the discussion, but providing another option (as I 
think it was not mentioned or I missed it). Take a look at 
[this](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) 
as I think this is precisely what you want to achieve.


Compared to other answers:

- this one is elastic, to fit any downstream use case

- no custom code - native Beam transform

- no shuffling of the data required as the data would be batched on the 
worker already having the data (but pay attention to the max msg size 
limit of your runner) - shuffling would be required when creating 
artificial random-looking keys.


Note that above is Python, but I do bet there is Java counterpart (or at 
least easy to implement).


Best

Wiśniowski Piotr


On 15.04.2024 19:14, Reuven Lax via user wrote:
There are various strategies. Here is an example of how Beam does it 
(taken from Reshuffle.viaRandomKey().withNumBuckets(N)


Note that this does some extra hashing to work around issues with the 
Spark runner. If you don't care about that, you could implement 
something simpler (e.g. initialize shard to a random number in 
StartBundle, and increment it mod numBuckets in each processelement call).

public static class AssignShardFn extends DoFn> {
   private int shard;
   private @Nullable Integer numBuckets;

   public AssignShardFn(@Nullable Integer numBuckets) {
 this.numBuckets = numBuckets;
   }

   @Setup public void setup() {
 shard =ThreadLocalRandom.current().nextInt();
   }

   @ProcessElement public void processElement(@Element T 
element,OutputReceiver> r) {
 ++shard;
 // Smear the shard into something more random-looking, to avoid issues 
// with runners that don't properly hash the key being shuffled, but 
rely // on it being random-looking. E.g. Spark takes the Java 
hashCode() of keys, // which for Integer is a no-op and it is an 
issue: // 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in- 
// spark.html // This hashing strategy is copied from // 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear(). 
int hashOfShard =0x1b873593 *Integer.rotateLeft(shard *0xcc9e2d51,15);

 if (numBuckets !=null) {
   UnsignedInteger unsignedNumBuckets 
=UnsignedInteger.fromIntBits(numBuckets);
   hashOfShard 
=UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
 }
 r.output(KV.of(hashOfShard, element));
   }
}

On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas 
 wrote:


Good day, Ruben,

Would you be able to compute a shasum on the group of IDs to use
as the key?

Best,

Damon

On 2024/04/12 19:22:45 Ruben Vargas wrote:
> Hello guys
>
> Maybe this question was already answered, but I cannot find it  and
> want some more input on this topic.
>
> I have some messages that don't have any particular key candidate,
> except the ID,  but I don't want to use it because the idea is to
> group multiple IDs in the same batch.
>
> This is my use case:
>
> I have an endpoint where I'm gonna send the message ID, this
endpoint
> is gonna return me certain information which I will use to enrich my
> message. In order to avoid fetching the endpoint per message I
want to
> batch it in 100 and send the 100 IDs in one request ( the endpoint
> supports it) . I was thinking on using GroupIntoBatches.
>
> - If I choose the ID as the key, my understanding is that it won't
> work in the way I want (because it will form batches of the same
ID).
> - Use a constant will be a problem for parallelism, is that correct?
>
> Then my question is, what should I use as a key? Maybe something
> regarding the timestamp? so I can have groups of messages that
arrive
> at a certain second?
>
> Any suggestions would be appreciated
>
> Thanks.
>


Re: Watermark progress halt in Python streaming pipelines

2024-04-24 Thread Wiśniowski Piotr

Hi!

Thank you for the hint. We will try with the mitigation from the issue. 
We did already tried everything from 
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact 
, but lets hope upgrading the dependency will help. Will keep reply to 
this thread once I get confirmation.


BTW great job on the investigation of bug that you mentioned. 
Impressive. Seems like a nasty one.


Best,

Wiśniowski Piotr

On 24.04.2024 00:31, Valentyn Tymofieiev via user wrote:

You might be running into https://github.com/apache/beam/issues/30867.

Among the error messages you mentioned, the  following is closer to 
rootcause: ``Error message from worker: generic::internal: Error 
encountered with the status channel: There are 10 consecutive failures 
obtaining SDK worker status info from sdk-0-0. The last success 
response was received 3h20m2.648304212s ago at 
2024-04-23T11:48:35.493682768+00:00. SDK worker appears to be 
permanently unresponsive. Aborting the SDK. For more information, see: 
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact``` 
<https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```>


If mitigations in https://github.com/apache/beam/issues/30867 don't 
resolve your issue, please see 
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact 
for insturctions on how to find what causes the workers to be stuck.


Thanks!

On Tue, Apr 23, 2024 at 12:17 PM Wiśniowski Piotr 
 wrote:


Hi,
We are investigating an issue with our Python SDK streaming
pipelines, and have few questions, but first context.
Our stack:
- Python SDK 2.54.0 but we tried also 2.55.1
- DataFlow Streaming engine with sdk in container image (we tried
also Prime)
- Currently our pipelines do have low enough traffic, so that
single node handles it most of the time, but occasionally we do
scale up.
- Deployment by Terraform `google_dataflow_flex_template_job`
resource, which normally does job update when re-applying Terraform.
- We do use a lot `ReadModifyWriteStateSpec`, other states and
watermark timers, but we do keep a the size of state under control.
- We do use custom coders as Pydantic avro.
The issue:
- Occasionally watermark progression stops. The issue is not
deterministic, and happens like 1-2 per day for few pipelines.
- No user code errors reported- but we do get errors like this:
```INTERNAL: The work item requesting state read is no longer
valid on the backend. The work has already completed or will be
retried. This is expected during autoscaling events.

[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]

<http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
{ trail_point { source_file_loc { filepath:
"dist_proc/windmill/client/streaming_rpc_client.cc" line: 767 } }
}']```
```ABORTED: SDK harness sdk-0-0 disconnected. This usually means
that the process running the pipeline code has crashed. Inspect
the Worker Logs and the Diagnostics tab to determine the cause of
the crash.

[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]

<http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
{ trail_point { source_file_loc { filepath:
"dist_proc/dax/workflow/worker/fnapi_control_service.cc" line: 217
} } } [dist_proc.dax.MessageCode] { origin_id: 5391582787251181999
[dist_proc.dax.workflow.workflow_io_message_ext]: SDK_DISCONNECT
}']```
```Work item for sharding key 8dd4578b4f280f5d tokens
(1316764909133315359, 17766288489530478880) encountered error
during processing, will be retried (possibly on another worker):
generic::internal: Error encountered with the status channel: SDK
harness sdk-0-0 disconnected. with MessageCode:
(93f1db2f7a4a325c): SDK disconnect.```
```Python (worker sdk-0-0_sibling_1) exited 1 times: signal:
segmentation fault (core dumped) restarting SDK process```
- We did manage to correlate this with either vertical autoscaling
event (when using Prime) or other worker replacements done by
Dataflow under the hood, but this is not deterministic.
- For few hours watermark progress does stop, but other workers do
process messages.
- and after few hours:
```Error message from worker: generic::internal: Error encountered
with the status channel: There are 10 consecutive failures
obtaining SDK worker status info from sdk-0-0. The last success
response was received 3h20m2.648304212s ago at
2024-04-23T11:48:35.493682768+00:00. SDK worker appears to be
permanently unresponsive. Aborting the SDK. For more information,
see:

https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```
   

Watermark progress halt in Python streaming pipelines

2024-04-23 Thread Wiśniowski Piotr

Hi,
We are investigating an issue with our Python SDK streaming pipelines, 
and have few questions, but first context.

Our stack:
- Python SDK 2.54.0 but we tried also 2.55.1
- DataFlow Streaming engine with sdk in container image (we tried also 
Prime)
- Currently our pipelines do have low enough traffic, so that single 
node handles it most of the time, but occasionally we do scale up.
- Deployment by Terraform `google_dataflow_flex_template_job` resource, 
which normally does job update when re-applying Terraform.
- We do use a lot `ReadModifyWriteStateSpec`, other states and watermark 
timers, but we do keep a the size of state under control.

- We do use custom coders as Pydantic avro.
The issue:
- Occasionally watermark progression stops. The issue is not 
deterministic, and happens like 1-2 per day for few pipelines.

- No user code errors reported- but we do get errors like this:
```INTERNAL: The work item requesting state read is no longer valid on 
the backend. The work has already completed or will be retried. This is 
expected during autoscaling events. 
[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] 
{ trail_point { source_file_loc { filepath: 
"dist_proc/windmill/client/streaming_rpc_client.cc" line: 767 } } }']```
```ABORTED: SDK harness sdk-0-0 disconnected. This usually means that 
the process running the pipeline code has crashed. Inspect the Worker 
Logs and the Diagnostics tab to determine the cause of the crash. 
[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto] 
{ trail_point { source_file_loc { filepath: 
"dist_proc/dax/workflow/worker/fnapi_control_service.cc" line: 217 } } } 
[dist_proc.dax.MessageCode] { origin_id: 5391582787251181999 
[dist_proc.dax.workflow.workflow_io_message_ext]: SDK_DISCONNECT }']```
```Work item for sharding key 8dd4578b4f280f5d tokens 
(1316764909133315359, 17766288489530478880) encountered error during 
processing, will be retried (possibly on another worker): 
generic::internal: Error encountered with the status channel: SDK 
harness sdk-0-0 disconnected. with MessageCode: (93f1db2f7a4a325c): SDK 
disconnect.```
```Python (worker sdk-0-0_sibling_1) exited 1 times: signal: 
segmentation fault (core dumped) restarting SDK process```
- We did manage to correlate this with either vertical autoscaling event 
(when using Prime) or other worker replacements done by Dataflow under 
the hood, but this is not deterministic.
- For few hours watermark progress does stop, but other workers do 
process messages.

- and after few hours:
```Error message from worker: generic::internal: Error encountered with 
the status channel: There are 10 consecutive failures obtaining SDK 
worker status info from sdk-0-0. The last success response was received 
3h20m2.648304212s ago at 2024-04-23T11:48:35.493682768+00:00. SDK worker 
appears to be permanently unresponsive. Aborting the SDK. For more 
information, see: 
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```

- And the pipeline starts to catch up and watermark progresses again.
- Job update by Terraform apply also fixes the issue.
- We do not see any extensive use of worker memory nor disk. CPU 
utilization is also most of the time close to idle. I do not think we do 
use C/C++ code with python. Nor use parallelism/threads outside beam 
parallelization.

Questions:
1. What could be potential causes of such behavior? How to get more 
insights to this problem?
2. I have seen `In Python pipelines, when shutting down inactive bundle 
processors, shutdown logic can overaggressively hold the lock, blocking 
acceptance of new work` in Beam release docs as known issue. What is the 
status of this? Can this potentially be related?

Really appreciate any help, clues or hints how to debug this issue.
Best regards
Wiśniowski Piotr

Re: [Question] Python Streaming Pipeline Support

2024-03-08 Thread Wiśniowski Piotr

Hi,

There are two more things that I would suggest to try:

1.

PipelineOptions(
beam_args,
streaming=True,
)

The `streaming` flag changes the mode how the runners operate. Not sure 
why, but I found this required to get similar behavior that you want to 
get. It might be required even if You do have unbounded sources in the 
pipeline?


I use this successfully on both direct runner and DataFlow.

2.

fromapache_beam.testing.test_streamimportElementEvent, TestStream, 
WatermarkEvent


Please check out this classes. This is the most elastic and proper way 
to test streaming pipelines. Moreover it allows you to control how 
watermark progresses on source and even maybe processing time triggers 
(but I did not managed to make this one work yet). I use this heavily to 
simulate PubSub and Kafka sources and even intermediate pipeline things 
too. Its kind of poorly documented, as it took me few days to grasp how 
to handle this with multiple input streams, but I do plan to create a 
post with findings.


I think I had similar days long investigation few months ago. Let me 
know if this is helpful.


Best

Wiśniowski Piotr


On 9.03.2024 03:29, Puertos tavares, Jose J (Canada) via user wrote:


**

*Hello group:*

I believe it might be interesting to show what I have found so found 
with you feedback as I  have corroborated that the Direct Runners and 
Flink Runner DO  work on streaming, but it seems more of a constraint 
on the definition of the PCollection rather than the operators, as 
show in  my code 
https://play.beam.apache.org/?sdk=python=TJNavCeJ_DS 
<https://play.beam.apache.org/?sdk=python=TJNavCeJ_DS>


Based on the fact that most samples leverage a messaging system as the 
streaming source I decided to use the Google Cloud PubSub emulator 
<https://cloud.google.com/pubsub/docs/emulator> to   have a setup 
where I push a message to the topic at the beginning, and create a 
pipeline that consumes the message from a subscription,  applies the 
windowing, applies the group by operation and at the end it pushes 
again the message hence providing the forever loop of consumption for 
streaming


*gcloud beta emulators pubsub start --project=beam-sample***

Executing: cmd /c …. cloud-pubsub-emulator.bat --host=localhost 
--port=8085


[pubsub] This is the Google Pub/Sub fake.

[pubsub] Implementation may be incomplete or differ from the real system.

[pubsub] Mar 08, 2024 7:46:19 PM 
com.google.cloud.pubsub.testing.v1.Main main


[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL 
checks are not supported


[pubsub] SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

[pubsub] SLF4J: Defaulting to no-operation (NOP) logger implementation

[pubsub] SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder 
for further details.


[pubsub] Mar 08, 2024 7:46:21 PM 
com.google.cloud.pubsub.testing.v1.Main main


[pubsub] INFO: Server started, listening on 8085

In another terminal then run the setup and the pipeline and run the 
using emulator documentation 
<https://cloud.google.com/pubsub/docs/emulator#using_the_emulator> to 
create the fake topic and subscription using the publisher.py 
<https://raw.githubusercontent.com/googleapis/python-pubsub/main/samples/snippets/publisher.py> 
 and subscriber.py 
<https://raw.githubusercontent.com/googleapis/python-pubsub/main/samples/snippets/subscriber.py> 
scripts


*set PUBSUB_EMULATOR_HOST=localhost:8085***

*set PUBSUB_PROJECT_ID=**beam-sample***

*publisher.py beam-sample create topic***

Created topic: projects/beam-sample/topics/topic

*subscriber.py beam-sample create topic subscription***

Subscription created: name: 
"projects/beam-sample/subscriptions/subscription"


topic: "projects/beam-sample/topics/topic"

push_config {

}

ack_deadline_seconds: 10

message_retention_duration {

seconds: 604800

}

*publisher.py beam-sample publish topic***

1

2

3

4

5

6

7

8

9

Published messages to projects/beam-sample/topics/topic.

*Direct Runner*looped in streaming as expected (although in my system 
it wasn’t every 10 seconds)


*eternal_pubsub.py --streaming  true***

Starting pipeline...

Message number 1

Message number 2

Message number 3

Message number 4

Message number 5

Message number 6

Message number 7

Message number 8

Message number 9

Messages from PubSub :9

Messages from PubSub :1

Messages from PubSub :1

Messages from PubSub :1

…

As per this post 
<https://stackoverflow.com/questions/68342095/error-while-running-beam-streaming-pipeline-python-with-pub-sub-io-in-embedded> 
FlinkRunner doesn’t support the PubSub operator but then I guess Kafka 
or other existing Unbound PCollection generator would work, and as I 
mentioned on my first post the ones that I have created are with the 
“old I/O Java Source”.


To summarize it seems then more than the support of  Group By 
Operations  is more towards the Unbounded collections.. I’ll 

Updating streaming DataFlow pipeline

2024-03-06 Thread Wiśniowski Piotr

Hi

I have a question. I do have a long running streaming dataflow jobs done 
with python sdk. I did notice that when I do update a job with exactly 
same code that created it, it may randomly fail (~50/50) with error like 
`The Coder or type for step event_stats/join/dewindow.None/FromValue has 
changed`.


The `dewindow` step is just `WindowInto(GlobalWinsows()` so that 
downstream state-full `DoFn`  would have its state global per key.  I am 
pretty sure that I did not change any types (as this is exact same code) 
and the coders I use are deterministic (pydantic avro) and types are set 
for every `PCollection`. What else should I look into, which could 
result in such behavior?


Best

Wiśniowski Piotr



Re: [Question] Does Apache SnowflakeIO support Apache Parquet?

2024-03-05 Thread Wiśniowski Piotr

Hi Xinmin,

After quick look at the connection in java it seems that the csv format 
was an explicit decision. As this is entirely transparent to user I am 
just curious why would You like to use parquet instead? What do you want 
to achieve?


Regards,

Wiśniowski Piotr

On 5.03.2024 05:47, Xinmin wrote:

Hello guru,

We would like to write data to Snowflake with S3 bucket, basically, 
the connector uploads the file in the CSV format to S3 bucket before 
copying data into the table. Please refer to below.


image.png

Do you have any plans to support Apache Parquet? Thanks.

Regards,
Xinmin


Re: Roadmap of Calcite support on Beam SQL?

2024-03-04 Thread Wiśniowski Piotr

Hi,

1. I do not have up to date knowledge, but Beam sql was missing quite a 
lot of things regarding Calcite full support. I think the current way is 
to create a feature request on repository and get votes and interest. I 
definitely would vote for You initiative ;)


2. Regarding the query itself I got it working for something like this:
```

WITHcte AS(
SELECTCAST(event_datetime ASTIMESTAMP) ASts
FROMPCOLLECTION
)
SELECT
CAST(TUMBLE_START(cte.ts, INTERVAL '1'MINUTE) ASVARCHAR) ASstart_time,
CAST(TUMBLE_END(cte.ts, INTERVAL '1'MINUTE) ASVARCHAR) ASend_time,
COUNT(*) ASpage_views
FROMcte
GROUP BY
TUMBLE(cte.ts, INTERVAL '1'MINUTE)
;

```

Maybe it would be useful for you. Note that I am not up to date with 
recent versions of Beam SQL, but I will need to catch up (the syntax for 
defining window on table is cool).


Best

Wiśniowski Piotr

On 4.03.2024 05:27, Jaehyeon Kim wrote:

Hello,

I just tried a simple tumbling window but failed with the following error

RuntimeError: org.apache.beam.sdk.extensions.sql.impl.ParseException: 
Unable to parse query

    WITH cte AS (
        SELECT TO_TIMESTAMP(event_datetime) AS ts FROM PCOLLECTION
    )
    SELECT
        CAST(window_start AS VARCHAR) AS start_time,
        CAST(window_end AS VARCHAR) AS end_time,
        COUNT(*) AS page_views
    FROM TABLE(
            TUMBLE(TABLE cte, DESCRIPTOR(ts), 'INTERVAL 1 MINUTE')
        )
    GROUP BY
        window_start, window_end

I guess it is because TO_TIMESTAMP is not implemented. When I check 
the document, it misses lots of functions. Is there any roadmap about 
Calcite support on Beam SQL?


Cheers,
Jaehyeon

Re: [Dataflow][Java][2.52.0] Upgrading to 2.52.0 Surfaces Pubsub Coder Error

2024-01-03 Thread Wiśniowski Piotr

Hi Evan,

Actually did hit similar problem few months ago and finally managed to 
solve it. My situation is a bit different as I am using Python SDK and 
DataFlow runner v2 (+streaming engine, prime), and quite a lot 
of state-full processing. And for my case it did fail with very similar 
msg but related to some state-full step.


The thing is I discovered that update pipeline in place does fail even 
when submitting exact same code to the pipeline. it seems the problem 
was that the pipeline graph must be parsed in same order that on the 
original graph. In my case I had an unordered set of steps to add them 
to pipeline resulting in the same pipeline graph, but it seems that the 
ordering of parsing does matter and it fails to update running job if 
order is different.


For my case I just sorted the steps to be added to pipeline by name and 
updating job on fly started working. So it seems that pipeline state on 
DataFlow depends somehow on the order in which steps are added to 
pipeline since some recent versions (as I do recall this was working 
correctly ~2.50?). Anyone knows if this is intended? If yes would like 
to know some explanation.


Best regards

Wiśniowski Piotr

On 15.12.2023 00:14, Evan Galpin wrote:
The pipeline in question is using Dataflow v1 Runner (Runner v2: 
Disabled) in case that's an important detail.


On Tue, Dec 12, 2023 at 4:22 PM Evan Galpin  wrote:

I've checked the source code and deployment command for cases of
setting experiments. I don't see "enable_custom_pubsub_source"
being used at all, no.  I also confirmed that it is not active on
the existing/running job.

On Tue, Dec 12, 2023 at 4:11 PM Reuven Lax via user
 wrote:

Are you setting the enable_custom_pubsub_source experiment by
any chance?

On Tue, Dec 12, 2023 at 3:24 PM Evan Galpin
 wrote:

Hi all,

When attempting to upgrade a running Dataflow pipeline
from SDK 2.51.0 to 2.52.0, an incompatibility warning is
surfaced that prevents pipeline upgrade:

The Coder or type for step .../PubsubUnboundedSource
has changed


Was there an intentional coder change introduced for
PubsubMessage in 2.52.0?  I didn't note anything in the
release notes https://beam.apache.org/blog/beam-2.52.0/
nor recent changes in
PubsubMessageWithAttributesCoder[1].  Specifically the
step uses `PubsubMessageWithAttributesCoder` via
`PubsubIO.readMessagesWithAttributes()`

Thanks!


[1]

https://github.com/apache/beam/blob/90e79ae373ab38cf4e48e9854c28aaffb0938458/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageWithAttributesCoder.java#L36


Re: [Python SDK] PyArrow Critical Vulnerability

2023-11-12 Thread Wiśniowski Piotr

Hi Valentyn,

Thank You for information and details. All make sense! I think we can 
wait for 2.53.0 release and meantime apply hotfix.


Best

Wiśniowski Piotr

On 10.11.2023 20:27, Valentyn Tymofieiev via user wrote:

From https://pypi.org/project/pyarrow-hotfix/ :

pyarrow_hotfix must be imported in your application or library code 
for it to take effect.

Just installing the package is not sufficient:

For Beam users, that means that the pipeline code running on the 
workers would need to import this module on every worker, for example 
by adding this line to DoFn.setup or in main session (if pipeline is 
composed only from one file AND uses dill pickler with 
--save_main_session flag).


We will continue addressing this in 
https://github.com/apache/beam/issues/29392.


On Fri, Nov 10, 2023 at 10:23 AM Valentyn Tymofieiev 
 wrote:


Hi Piotr, thanks for bringing this to the list.

There is a FR to support pyarrow
https://github.com/apache/beam/issues/28410 . I looked into it
briefly in https://github.com/apache/beam/pull/28437 but saw some
test failures and it has been on back burner. Given the news about
vulnerability it would make sense to prioritize this.

I think we could decouple this from 2.52.0 release since:
  1) there is a workaround
  2) new versions of pyarrow haven't been fully tested with Beam
  3) Beam 2.52.0 fixes some other issues that are known to
affecting users, e.g. https://github.com/apache/beam/issues/28246

From

https://securityonline.info/cve-2023-47248-pyarrow-arbitrary-code-execution-vulnerability-a-critical-threat-to-data-analysts/
:
  > If you cannot upgrade to PyArrow 14.0.1, you can use the
pyarrow-hotfix package to disable the vulnerability on older
versions of PyArrow. However, this is not a permanent solution,
and you should upgrade to PyArrow 14.0.1 as soon as possible. We
could consider adding pyarrow-hotfix to the containers for 2.52.0
release. CC: @Danny McCormick
<mailto:dannymccorm...@google.com> (release manager).

Beam users can also install this additional dependency via one of
the ways described in
https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
.



On Fri, Nov 10, 2023 at 4:42 AM Wiśniowski Piotr
 wrote:

Hi,

Few days ago this one was detected:

https://securityonline.info/cve-2023-47248-pyarrow-arbitrary-code-execution-vulnerability-a-critical-threat-to-data-analysts/

I do see that beam 2.51.0 does have `pyarrow<=12.0.0` in
requirements.

1. Is there a reason for not allowing newer versions of pyarrow?

2. Is there any planned effort on updating this to `14.0.1`?
Is it
possible to push the update to `2.52.0` beam release? I know
the beam
release is almost there.

Best

    Wiśniowski Piotr


Fwd: [Python SDK] PyArrow Critical Vulnerability

2023-11-10 Thread Wiśniowski Piotr

Hi,

Few days ago this one was detected: 
https://securityonline.info/cve-2023-47248-pyarrow-arbitrary-code-execution-vulnerability-a-critical-threat-to-data-analysts/


I do see that beam 2.51.0 does have `pyarrow<=12.0.0` in requirements.

1. Is there a reason for not allowing newer versions of pyarrow?

2. Is there any planned effort on updating this to `14.0.1`? Is it 
possible to push the update to `2.52.0` beam release? I know the beam 
release is almost there.


Best

Wiśniowski Piotr



Deployment approach for stateful streaming jobs

2023-10-03 Thread Wiśniowski Piotr

Hi,

Any typical patterns for deployment of stateful streaming pipelines from 
Beam? Targeting Dataflow and Python SDK with significant usage of 
stateful processing with long windows (typically days long).


From our current practice of maintaining pipelines we did identify 3 
typical scenarios:


1. Deployment without breaking changes (no changes to pipeline graph, 
coders, states, outputs etc.) - just update DataFlow job in place.


2. Deployment with changes to internal state (changes to coders, state, 
or even pipeline graph but without changing pipeline input/output 
schemas) - in this case updating job in place would not work as the 
state did change and reading state saved by old pipeline would result in 
error.


3. Deployment with changes to output schema (and potentially to internal 
state too) - we need to take special care of changing output schema to 
be sure downstream processes also have a time to switch from old version 
of data to the new one.


To be specific I need some advice/patterns/knowledge on p.2 and p.3. I 
guess it will require spinning new pipelines with data back-filling or 
migration jobs? Would really appreciate detailed examples how you are 
dealing with deploying similar streaming stateful pipelines. Ideally 
with details on how much data to reprocess to populate internal state, 
what needs to be done when changing output schema of a pipeline, how to 
orchestrate all this activities.


Best
Wisniowski PIotr


Re: UDF/UADF over complex structures

2023-10-02 Thread Wiśniowski Piotr

Hi Gyorgy,

I guess Your problem might not be directly related to UDFs not UDAFs but 
to nested structure of Your data. I did have a problem with processing 
nested data and also did not find a way to work this out - especially if 
You are not in control of input data structure.


This issues seem very related as nested Rows need some polishing:

- https://github.com/apache/beam/issues/26911

- https://github.com/apache/beam/issues/27733

I have plans to jump on fix for this two, and I guess it would not be 
hard, but I do not have any capacity for open source tasks for now. 
Anyone would like to take a look at them?


Best

Wiśniowski Piotr


On 28.09.2023 19:31, Balogh, György wrote:
Sorry I was not specific enough. I ment using the SqlTransform 
registerUdf and registerUdaf. I use a lot of SQL in my pipeline and I 
would prefer using SQL UDFs in many cases over writing beam 
transforms. I already have UDFs but I did not find a way to make them 
work over nested structures.

Thank you,
Gyorgy

On Thu, Sep 28, 2023 at 5:40 PM Robert Bradshaw via user 
 wrote:


Yes, for sure. This is one of the areas Beam excels vs.
more simple tools like SQL. You can write arbitrary code to
iterate over arbitrary structures in the typical
Java/Python/Go/Typescript/Scala/[pick your language] way. In the
Beam nomenclature. UDFs correspond to DoFns and UDAFs correspond
to CombineFns.

On Thu, Sep 28, 2023 at 4:23 AM Balogh, György
 wrote:

Hi,
I've complex nested structure in my input data. Is it possible
to have UDF/UDAF taking nested structure as input? I'm using
java. Outputting nested structure is also a question.
Thank you,
Gyorgy
-- 


György Balogh
CTO
E   gyorgy.bal...@ultinous.com <mailto:zsolt.sala...@ultinous.com>
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com <http://www.ultinous.com>



--

György Balogh
CTO
E   gyorgy.bal...@ultinous.com <mailto:zsolt.sala...@ultinous.com>
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com <http://www.ultinous.com>


Re: Issue with growing state/checkpoint size

2023-09-06 Thread Wiśniowski Piotr

Hi,

Sorry for the late response - was busy with pretty similar problem.

First let me clarify on the pipeline You use first.

You have a pipeline with a step produces two stream outputs. They do 
flow thru some computation independently and then You join them by 
CoGroupByKey with session window and expect to have match in message id. 
So You are trying to achieve streaming inner join (which should emit 
output when same message id arrives on both streams). Is it right?


My finding:

1. sessions windows are very tricky to work with CoGroupByKey as they 
are merging windows (so beam may need to wait arbitrary long 
accumulating state if one of the streams continuously outputs same 
message id - this is first thing to check in You case). This is only one 
of potential corner cases with session windows. But I would highly 
advice to change this to just plain sliding window on computation time 
domain as it would allow You to clearly specify how long step should 
keep waiting for particular message id and how frequently it should emit.


2. There is also a case that one of the streams might not produce a 
message id for particular message. Double make sure that every message 
that is processes by You pipelines emits only one message per message id 
always. You also need to consider what to do with the state once pair of 
messages is joined and emitted. By default SessionWindows will wait for 
next messages with same message id accumulating state for some time 
after. This also might be the source of Your problem.


Now the solution that I worked for me in similar problem.

1. window both streams in FixedWidnows(1) on processing time domain. 
Make sure keys are set.


2. CoGroupByKey

3. window output of CoGroupByKey into GlobalWindows

4. Create state-full ParDoFn with timers (to do cleanup) that captures 
state if a message arrives on one stream, emits both if both messages 
arrived, does proper action (up to You) when next messages with same key 
arrive) and does the cleanup.


Explanation;

FixedWidnows - make sure that elements are groupped in 1s batches to 
asses join - this is just to meet requirement from CoGroupByKey to have 
windows defined on input streams.


CoGroupByKey  - does join messages that arrive on same second window

GlobalWindows - makes sure that state-full ParDoFn keeps state globally 
per key (as it will manage its state by itself).


state-full ParDoFn - does the waiting part for the message and contains 
all Your logic for the corner cases defined above.


above just worked perforce for me. Let me know If its a solution for 
Your case too. We might want to introduce some common api for such cases 
(its quite easy to code this in generic form).


Best

Wiśniowski Piotr

 On 1.09.2023 15:56, Byron Ellis via user wrote:
Depends on why you're using a fan-out approach in the first place. You 
might actually be better off doing all the work at the same time.


On Fri, Sep 1, 2023 at 6:43 AM Ruben Vargas  
wrote:


Ohh I see

That makes sense. Wondering if there is an strategy for my use
case, where I have an ID unique per pair of messages

Thanks for all your help!

On Fri, Sep 1, 2023 at 6:51 AM Sachin Mittal 
wrote:

Yes a very high and non deterministic cardinality can make the
stored state of join operation unbounded.
In my case we know the cardinality and it was not very high so
we could go with a lookup based approach using redis to enrich
the stream and avoid joins.



On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas
 wrote:

Thanks for the reply and the advice

One more thing, Do you know if the key-space carnality
impacts on this? I'm assuming it is, but the thing is for
my case all the messages from the sources has a unique ID,
that makes my key-space huge and is not on my control .

On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal
 wrote:

So for the smaller size of collection which does not
grow with size for certain keys we stored the data in
redis and instead of beam join in our DoFn we just did
the lookup and got the data we need.


On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas
 wrote:

Hello,

Thanks for the reply, Any strategy you followed to
avoid joins when you rewrite your pipeline?



On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal
 wrote:

Yes even we faced the same issue when trying
to run a pipeline involving join of two
collections. It was deployed using AWS KDA,
which uses flink runner. The source was
kinesis streams.

Looks like join operations are not very

Re: Delete window information

2023-08-01 Thread Wiśniowski Piotr

Hi,

This is very typical usage. Beam abstraction requires that all events 
must belong to a window ( and default window is a single global window), 
so it is not possible to delete a window information. But what You 
really want is to overwrite the current window information set for the 
first aggregation with new window information set for second 
aggregation. This is very typical for Beam pipelines to set new windows 
per each aggregation operation.


In practice I imagine You would need to just

|"back to global window" >>WindowInto(GlobalWindows())

before second aggregation operation in case of bounded sources. If You 
are working with unbounded source (streaming aggregation), using  global 
window directly will fail as it needs to wait for infinite to get all 
data before emitting output. IN this case You would need to also set 
some trigger and accumulation_mode to tell beam when to emit elements 
for the aggregated keys.


Hope this helps. If not please provide a bit more details what exactly 
You are trying to do so that I can try to help.
Also You could try to get more familiar with the abstraction with the 
help of https://tour.beam.apache.org/.


Best
Wiśniowski Piotr


On 1.08.2023 09:17, Guagliardo, Patrizio via user wrote:


Hi together,

I have a question regarding Apache Beam in Python:

When I create a window with timestamps and then make a groupby, then 
the information for the windows remains in the elements. Afterwards, I 
want to make another groupby (something like a cumsum) by certain 
keys, but that does not work as the windows are still there and so it 
takes the keys + windows to split my data in the combineFn. Question 
now: How can I “delete” the windows and timestamps so that I would 
combine records just by the defined keys in the last combineFn.


Hope this is clear.

Best,

Patrizio



This e-mail and any attachments may be confidential or legally 
privileged. If you received this message in error or are not the 
intended recipient, you should destroy the e-mail message and any 
attachments or copies, and you are prohibited from retaining, 
distributing, disclosing or using any information contained herein. 
Please inform us of the erroneous delivery by return e-mail. Thank you 
for your cooperation. For more information on how we use your personal 
data please see our Privacy Notice 
<https://www.oliverwyman.com/policies/privacy-notice.html>. 

Re: Beam SQL found limitations

2023-05-31 Thread Wiśniowski Piotr

Hi Kenn,

Thanks for clarification.

1. Just to put an example in front - for every event that comes in I 
need to find corresponding previous event of same user_id and pass 
previous_event_timestamp in the current event payload down (and also 
current event becomes previous event for future events that come in for 
same user). Question is how to do it with BeamSQL. I am aware that 
analytic windowing (like last_value over etc.) might not be a way for 
streaming and I am ok with this - it make sense under the hood just as 
You mention.


The task is to be able to keep a simple state in streaming SQL. What I 
did come up with is using sliding window to have this state available 
for each new event that comes in.


```

WITH
unbounded_stream_initialized AS (
    SELECT
    user_id,
    event_time
    FROM unbounded_stream
    GROUP BY
    user_id,
    event_time,
    TUMBLE(event_time,INTERVAL '1' SECONDS)
    UNION ALL
    -- this is needed as first session window by default starts at 
first element, while here we need to start it in the past

    -- so that there is a window that ends just after first real element
    SELECT
    CAST(0 AS BIGINT) AS user_id,
    CAST('2023-05-19 08:00:00' AS TIMESTAMP) AS event_time
    FROM UNNEST(ARRAY[0]) AS arr -- this is dummy as syntax does not 
allow to have GROUP BY just after SELECT
    GROUP BY TUMBLE(CAST('2023-05-19 08:00:00' AS TIMESTAMP), INTERVAL 
'1' SECONDS)

),
test_data_1 AS (
    SELECT
    user_id,
    MAX(event_time) AS prev_event_time,
    HOP_END(event_time, INTERVAL '1' SECONDS, INTERVAL '7' DAYS) AS 
window_end_at

    FROM unbounded_stream_initialized
    GROUP BY
    user_id,
    HOP(
    -- first create a sliding window to aggregate state
    event_time,
    INTERVAL '1' SECONDS,
    INTERVAL '7' DAYS -- The idea is to have this quite long 
compared to interval

    )
),
test_data_1_lookup AS (
    SELECT
    user_id,
    prev_event_time
    FROM test_data_1
    GROUP BY
    user_id,
    -- then re-window into windows suitable for joining main stream
    TUMBLE(window_end_at, INTERVAL '1' SECONDS)
),
enriched_info AS (
    SELECT
    unbounded_stream_initialized.event_timestamp AS event_timestamp,
    unbounded_stream_initialized.user_id AS user_id,
    test_data_1_lookup.prev_event_time AS prev_event_time
    FROM unbounded_stream_initialized
    LEFT JOIN test_data_1_lookup
    ON unbounded_stream_initialized.user_id = 
test_data_1_lookup.user_id

)
SELECT
    *
FROM enriched_info

```

The doubt that I have is whether above will not store too much redundant 
data as `test_data_1` suggests it could duplicate and store each 
incoming msg into all windows there are in the sliding window definition 
(might be a lot in this case). Or actually resolving if a message 
belongs to a window is done later when evaluating `LEFT JOIN`? Target 
DataFlow. I am still learning Beam so there might be some core thing 
that I miss to understand how it is processed.


2. Any hints on implementing FirestoreIOTableProvider? just more or less 
how to do it where to look for important parts etc. It seems we would 
need this functionality.


3. I will try to report some more interesting findings. If possible 
please prioritize fixing this ROW error.


Best

Piotr

On 26.05.2023 21:36, Kenneth Knowles wrote:
Just want to clarify that Beam's concept of windowing is really an 
event-time based key, and they are all processed logically 
simultaneously. SQL's concept of windowing function is to sort rows 
and process them linearly. They are actually totally different. From 
your queries it seems you are interested in SQL's windowing functions 
(aka analytic functions).


I am surprised by the problems with rows, since we have used them 
extensively. Hopefully it is not too hard to fix. Same with the UNION 
ALL problem.


And for the CROSS JOIN it would be a nice feature to allow in some 
cases it seems. Should not be hard.


Thank you for reporting this! If you have time it would be really 
great to get each of these reproducible problems into GitHub issues, each.


Kenn

On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr 
 wrote:


Hi Alexey,

Thank You for reference to that discussion I do actually have
pretty similar thoughts on what Beam SQL needs.

Update from my side:

Actually did find a workaround for issue with windowing function
on stream. It basically boils down to using sliding window to
collect and aggregate the state. But would need an advice if this
is actually a cost efficient method (targeting DataFlow runner).
The doubt that I have is that this sliding window would need to
have sliding interval less than 1s and size more than a week and
be feed with quire frequent data. If I do understand this
correctly - it would mean each input row would need to be
duplicated for each window

Re: Beam SQL found limitations

2023-05-26 Thread Wiśniowski Piotr

Hi Alexey,

Thank You for reference to that discussion I do actually have pretty 
similar thoughts on what Beam SQL needs.


Update from my side:

Actually did find a workaround for issue with windowing function on 
stream. It basically boils down to using sliding window to collect and 
aggregate the state. But would need an advice if this is actually a cost 
efficient method (targeting DataFlow runner). The doubt that I have is 
that this sliding window would need to have sliding interval less than 
1s and size more than a week and be feed with quire frequent data. If I 
do understand this correctly - it would mean each input row would need 
to be duplicated for each window and stored which could be quite 
significant storage cost?


Or actually Beam does not physically duplicate the record but just 
tracks to which windows the record currently belongs?



And the real issue that BeamSQL needs at the moment in my opinion is 
fixing bugs.


Some bugs that I found that prevent one from using it and would really 
appreciate fast fix:


- UNNEST ARRAY with a nested ROW (described below, created ticket - 
https://github.com/apache/beam/issues/26911)


- PubSub table provider actually requires all table properties to be 
there (with null in `timestampAttributeKey` it fails) - which 
essentially does not allow one to use pubsub publish timestamp as 
`timestampAttributeKey`.


- its not possible to cast VARCHAR to BYTES. And BYTES is needed for 
DataStoreV1TableProvider to provide a key for storage. Also consider 
updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it 
requires VARCHAR instead of BYTES - its even easier in implementation.


- Any hints on how to implement `FireStoreIOTableProvider`? I am 
considering implementing it and contributing depending on my team 
decision - but would like to get like idea how hard this task is.


Will create tickets for the rest of issues when I will have some spare time.

Best regards

Wiśniowski Piotr


On 22.05.2023 18:28, Alexey Romanenko wrote:

Hi Piotr,

Thanks for details! I cross-post this to dev@ as well since, I guess, 
people there can provide more insights on this.


A while ago, I faced the similar issues trying to run Beam SQL against 
TPC-DS benchmark.
We had a discussion around that [1], please, take a look since it can 
be helpful.


[1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b

—
Alexey

On 18 May 2023, at 11:36, Wiśniowski Piotr 
 wrote:


HI,

After experimenting with Beam SQL I did find some limitations. 
Testing on near latest main (precisely 
`5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct 
runner and openjdk version "11.0.19". Please let me know if some of 
them are known/ worked on/ have tickets or have estimated fix time. I 
believe most of them are low hanging fruits or just my thinking is 
not right for the problem. If this is the case please guide me to 
some working solution.


 From my perspective it is ok to have a fix just on master - no need 
to wait for release. Priority order:
- 7. Windowing function on a stream - in detail - How to get previous 
message for a key? setting expiration arbitrary big is ok, but access 
to the previous record must happen fairly quickly not wait for the 
big window to finish and emit the expired keys. Ideally would like to 
do it in pure beam pipeline as saving to some external key/value 
store and then reading this here could potentially result in some 
race conditions which in I would like to avoid, but if its the only 
option - let it be.

- 5. single UNION ALL possible
- 4. UNNEST ARRAY with nested ROW
- 3. Using * when there is Row type present in the schema
- 1. `CROSS JOIN` between two unrelated tables is not supported - 
even if one is a static number table

- 2. ROW construction not supported. It is not possible to nest data

Below queries tat I use to testing this scenarios.

Thank You for looking at this topics!

Best

Wiśniowski Piotr

---
-- 1. `CROSS JOIN` between two unrelated tables is not supported.
---
-- Only supported is `CROSS JOIN UNNEST` when exploding array from 
same table.

-- It is not possible to number rows
WITHdata_table AS(
SELECT1ASa
),
number_table AS(
SELECT
numbers_exploded ASnumber_item
FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) 
ASnumbers_exploded

)
SELECT
data_table.a,
number_table.number_item
FROMdata_table
CROSS JOINnumber_table
;
-- CROSS JOIN, JOIN ON FALSE is not supported!
---
-- 2. ROW construction not supported. It is not possible to nest data
---
SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same 
type

SELECTMAP['field1','b','field2','a']; -- null
-- WORKAROUND - manually compose json string,
-- drawback - decomposing might be not supported or 

Beam SQL found limitations

2023-05-18 Thread Wiśniowski Piotr

HI,

After experimenting with Beam SQL I did find some limitations. Testing 
on near latest main (precisely 
`5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct runner 
and openjdk version "11.0.19". Please let me know if some of them are 
known/ worked on/ have tickets or have estimated fix time. I believe 
most of them are low hanging fruits or just my thinking is not right for 
the problem. If this is the case please guide me to some working solution.


 From my perspective it is ok to have a fix just on master - no need to 
wait for release. Priority order:
- 7. Windowing function on a stream - in detail - How to get previous 
message for a key? setting expiration arbitrary big is ok, but access to 
the previous record must happen fairly quickly not wait for the big 
window to finish and emit the expired keys. Ideally would like to do it 
in pure beam pipeline as saving to some external key/value store and 
then reading this here could potentially result in some race conditions 
which in I would like to avoid, but if its the only option - let it be.

- 5. single UNION ALL possible
- 4. UNNEST ARRAY with nested ROW
- 3. Using * when there is Row type present in the schema
- 1. `CROSS JOIN` between two unrelated tables is not supported - even 
if one is a static number table

- 2. ROW construction not supported. It is not possible to nest data

Below queries tat I use to testing this scenarios.

Thank You for looking at this topics!

Best

Wiśniowski Piotr

---
-- 1. `CROSS JOIN` between two unrelated tables is not supported.
---
-- Only supported is `CROSS JOIN UNNEST` when exploding array from same 
table.

-- It is not possible to number rows
WITHdata_table AS(
SELECT1ASa
),
number_table AS(
SELECT
numbers_exploded ASnumber_item
FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) ASnumbers_exploded
)
SELECT
data_table.a,
number_table.number_item
FROMdata_table
CROSS JOINnumber_table
;
-- CROSS JOIN, JOIN ON FALSE is not supported!
---
-- 2. ROW construction not supported. It is not possible to nest data
---
SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same type
SELECTMAP['field1','b','field2','a']; -- null
-- WORKAROUND - manually compose json string,
-- drawback - decomposing might be not supported or would need to be 
also based on string operations

SELECT('{"field1":"'||1||'","field2":"'||'a'||'"}') AS`json_object`;
---
-- 3. Using * when there is Row type present in the schema
---
CREATEEXTERNALTABLEtest_tmp_1(
`ref`VARCHAR,
`author`ROW<
`name`VARCHAR,
`email`VARCHAR



)
TYPEtext
LOCATION'python/dbt/tests/using_star_limitation.jsonl'
TBLPROPERTIES '{"format":"json", 
"deadLetterFile":"top/python/dbt/tests/dead"}';

SELECT*FROMtest_tmp_1;
-- java.lang.NoSuchFieldException: name
-- WORKAROUND - refer to columns explicitly with alias
SELECT
`ref`ASref_value,
test_tmp_1.`author`.`name`ASauthor_name, -- table name must be 
referenced explicitly - this could be fixed too

test_tmp_1.`author`.`email`ASauthor_name
FROMtest_tmp_1;
---
-- 4. UNNEST ARRAY with nested ROW
---
CREATEEXTERNALTABLEtest_tmp(
`ref`VARCHAR,
`commits`ARRAY




)
TYPEtext
LOCATION'python/dbt/tests/array_with_nested_rows_limitation.jsonl'
TBLPROPERTIES '{"format":"json", "deadLetterFile":"python/dbt/tests/dead"}';
SELECT
test_tmp.`ref`ASbranch_name,
commit_item.`id`AScommit_hash,
commit_item.`author`.`name`ASauthor_name
FROMtest_tmp
CROSS JOINUNNEST(test_tmp.commits) AScommit_item;
-- Row expected 4 fields (Field{name=ref, description=, type=STRING, 
options={{}}}, Field{name=commits, description=, type=ARRAYSTRING, author ROW> NOT NULL>, options={{}}}, 
Field{name=id, description=, type=STRING, options={{}}}, 
Field{name=author, description=, type=ROW, 
options={{}}}). initialized with 5 fields.
-- limited WORKAROUND - refer to array elements by index and UNION ALL 
the items into rows
-- note workaround that uses number table will not work as CROSS JOIN is 
not supported

WITHdata_parsed AS(
SELECT
test_tmp.`ref`ASbranch_id,
test_tmp.commits[1].`id`AScommit_hash,
test_tmp.commits[1].`author`.`name`ASauthor_name
FROMtest_tmp
UNION ALL-- this unfortunately works only for two indexes
SELECT
test_tmp.`ref`ASbranch_id,
test_tmp.commits[2].`id`AScommit_hash,
test_tmp.commits[2].`author`.`name`ASauthor_name
FROMtest_tmp
)
SELECT*
FROMdata_parsed
WHEREauthor_name IS NOT NULL
;
-- better WORKAROUND - but tricky to get right (fragile)
WITHdata_with_number_array AS(
SELECT
test_tmp.`ref`ASbranch_name, -- there must be some primary key in the 
data to join on later due t

Re: Can I batch data when i use JDBC write operation?

2023-04-21 Thread Wiśniowski Piotr

Hi,

Not tested, but few options that might be a solutions for You problem:

1. go with having read and write replicas of Your DB - so that write 
replica would get inserts one by one and live with this. Make sure to 
deduplicate the data before insert to avoid potential collisions (this 
should not be a problem, but I am not sure how the subsystem would behave)


2.

- Add a step to group the input data into a time window

- then ingest the events from a window to a unique temp table (just by 
plain `INSERT INTO`)


- then add next step in pipeline to trigger merge operation from tmp 
table to You production table. Make sure same connection session is used 
or the tmp table will be gone. Also not sure if it is possible to invoke 
only one command per window by using `WriteToJdbc` operator after 
previous write finishes. But this is up for your experimentation/ anyone 
with more experience has some knowledge how to code it?


3. Another option is to

- aggregate events into a window - so that only one element would be 
emited a window (array of events)


- try somehow to upt this record in statement as a single row

- in subsequent CTEs deserialize the array into multiple rows

- do insert with update.

So the sql on the DB side would look something like:

```

|WITH new_values (arr) as ( values (?) ), deser AS ( SELECT explode(arr) 
FROM new_values ), ||upsert as ( update mytable m set field1 = nv.field1, field2 = nv.field2 
FROM |||deser| nv WHERE m.id = nv.id RETURNING m.* ) INSERT INTO mytable (id, 
field1, field2) SELECT id, field1, field2 FROM |||deser| WHERE NOT EXISTS (SELECT 1 FROM upsert up WHERE up.id = 
new_values.id)|


```

Above is just pseudocode that I did not test, but it could be a hint for 
You. Also great answer on this one here: 
https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/8702291#8702291


4. some mix of p.2 and p.3

Hopefully this helps You in breaking the problem.

Best regards

Wiśniowski Piotr

On 21.04.2023 01:34, Juan Romero wrote:

Hi. Can someone help me with this?

El mié, 19 abr 2023 a las 15:08, Juan Romero () 
escribió:


Hi community.

On this occasion I have a doubt regarding how to read a stream
from kafka and write batches of data with the jdbc connector. The
idea is to override a specific row if the current row we want to
insert into has the same id and the load_date_time is greater. The
conceptual pipeline look like this and it is working (Take in mind
that the source will be a streaming from kafka):

ExampleRow = typing.NamedTuple('ExampleRow', id=int, name=str, 
load_date_time=str)
with beam.Pipeline()as p:
   _ = (
   p
   | beam.Create(
 [

 ExampleRow(1, '', '2023-04-05 12:34:56'), ExampleRow(1, 
'yyyz', '2023-04-05 12:34:55')
 ]).with_output_types(ExampleRow)
   |'Write to jdbc' >> WriteToJdbc(
   driver_class_name='org.postgresql.Driver', 
jdbc_url='jdbc:postgresql://localhost:5432/postgres', username='postgres', 
password='postgres', table_name='test', 
connection_properties="stringtype=unspecified", statement='INSERT INTO test \ 
VALUES(?,?,?) \ ON CONFLICT (id)\ DO UPDATE
SET name = EXCLUDED.name, load_date_time =
EXCLUDED.load_date_time\ WHERE EXCLUDED.load_date_time::timestamp
> test.load_date_time::timestamp', ))

My question is if I want to write a stream that comes from kafka
how can how can avoid the jdbc connector inserting the register
one by one statement and rather insert the data in based time
batches. Probably internally jdbc has some kind of "intelligence
for do this" but i want to know what do you think about it  .

Thank you!


Re: [java] Trouble with gradle and using ParquetIO

2023-04-21 Thread Wiśniowski Piotr

Hi Evan,

Just to have full knowledge:

- "provided" should be used when You expect the target cluster on 
environment to have the package of interest installed so you do not have 
to include it in the pipeline jar (this is to have it more lightweight 
and easier to maintain coherent target jre env across organization).


- it seems that You should either install the library on You target env 
or include it in your build jar. Up to Your specific use case. Typically 
corporation envs provide commonly used libs in their envs like spark, 
and IO libs - and this might be the reason that maven suggest this.


Best

Wiśniowski Piotr

On 21.04.2023 08:30, Moritz Mack wrote:


Hi Evan,

Not sure why maven suggests using “compileOnly”.

That’s certainly wrong, make sure to use “implementation” in your case.

Cheers, Moritz

On 21.04.23, 01:52, "Evan Galpin"  wrote:

Hi all, I'm trying to make use of ParquetIO.   Based on what's 
documented in maven central, I'm including the artifact in 
"compileOnly" mode (or in maven parlance, 'provided' scope).   I can 
successfully compile


Hi all,

I'm trying to make use of ParquetIO.  Based on what's documented in 
maven central, I'm including the artifact in "compileOnly" mode (or in 
maven parlance, 'provided' scope).  I can successfully compile my 
pipeline, but when I run it I (intuitively?) am met with a 
ClassNotFound exception for ParquetIO.


Is 'compileOnly' still the desired way to include ParquetIO as a 
pipeline dependency?


Thanks,

Evan


  *As a recipient of an email from the Talend Group, your personal
  data will be processed by our systems. Please see our Privacy Notice
  <https://www.talend.com/privacy-policy/>*for more information about
  our collection and use of your personal information, our security
  practices, and your data protection rights, including any rights you
  may have to object to automated-decision making or profiling we use
  to analyze support or marketing related communications. To manage or
  discontinue promotional communications, use the communication
  preferences portal
  <https://info.talend.com/emailpreferencesen.html>. To exercise your
  data protection rights, use the privacy request form
  
<https://talend.my.onetrust.com/webform/ef906c5a-de41-4ea0-ba73-96c079cdd15a/b191c71d-f3cb-4a42-9815-0c3ca021704cl>.
  Contact us here <https://www.talend.com/contact/>or by mail to
  either of our co-headquarters: Talend, Inc.: 400 South El Camino
  Real, Ste 1400, San Mateo, CA 94402; Talend SAS: 5/7 rue Salomon De
  Rothschild, 92150 Suresnes, France


Re: Beam shell sql with zeta

2023-04-20 Thread Wiśniowski Piotr

Thank You for the reply and a hint.

1. Yes did try with Calcite `ROW` too - `java.lang.NoSuchFieldException: 
head (state=,code=0)` but on the transformation side `SELECT * FROM 
etl_raw LIMIT 1`. Maybe I need to directly refer to a field that I need 
instead of using `*`? Do You know from top of your head whats the syntax 
to get `repo_state.head.commit` value? Did try also to add more fields 
to the table def but no luck.


2.

> doesn't actually do anything on the SQL shell at query parse time

This is also my observation. What is the proper way to initialize Zeta? 
Did try `./bin/shell 
--plannerName=org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner` 
and few other variations but seems like sqlline does not allow for 
passing parameters to pipeline options.


I do not have any requirement specific for Zeta but it seems it has a 
bit more support for JSON functions than Calcite.


3. Any other idea to workaround this issue?

Thanks for support!

For context I do try to setup some POC with this SQLs as it could really 
speed up development and maintenance of our pipelines + SQL has low 
requirements regarding knowledge to actually understand the logic (read 
the code). If this works it might be a great (killer) feature for Beam.


Best

Wisniowski Piotr



On 20.04.2023 20:52, Andrew Pilloud via user wrote:

set plannerName doesn't actually do anything on the SQL shell at query 
parse time, it will still use the calcite parser. Have you tried 
calcite SQL?


Support for struts is somewhat limited. I know there are bugs around 
nested structs and structs with single values.


Andrew

On Thu, Apr 20, 2023 at 9:26 AM Wiśniowski Piotr 
 wrote:


Hi,

I have a question regarding usage of Zeta with SQL extensions in SQL
shell. I try to:

```

SET runner = DirectRunner;
SET tempLocation = `/tmp/test/`;
SET streaming=`True`;
SET plannerName =
`org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`;

CREATE EXTERNAL TABLE etl_raw(
 event_timestamp TIMESTAMP,
 event_type VARCHAR,
 message_id VARCHAR,
 tracking_source VARCHAR,
 tracking_version VARCHAR,
 `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name`
VARCHAR>>
)
TYPE pubsub
LOCATION 'projects/xxx/topics/xxx'
TBLPROPERTIES '{"format":"json"}';

```

But get error `parse failed: Encountered "STRUCT" `.

If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but
still I do fail to receive data on

`SELECT * FROM etl_raw LIMIT 1;` with exception of
`java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure
that the field is there in json payload.

With commented out `repo_state` filed I am able to retrieve the data.
Unfortunately I do not have control over the payload structure as its
3rd party hook to make it flat.

In general I am unable to parse json msg from pubsub having
structured
field.

Is anyone familiar with this part of Beam functionalities?

Best regards

Wisniowski Piotr




Beam shell sql with zeta

2023-04-20 Thread Wiśniowski Piotr

Hi,

I have a question regarding usage of Zeta with SQL extensions in SQL 
shell. I try to:


```

SET runner = DirectRunner;
SET tempLocation = `/tmp/test/`;
SET streaming=`True`;
SET plannerName = 
`org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`;


CREATE EXTERNAL TABLE etl_raw(
    event_timestamp TIMESTAMP,
    event_type VARCHAR,
    message_id VARCHAR,
    tracking_source VARCHAR,
    tracking_version VARCHAR,
    `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>>
)
TYPE pubsub
LOCATION 'projects/xxx/topics/xxx'
TBLPROPERTIES '{"format":"json"}';

```

But get error `parse failed: Encountered "STRUCT" `.

If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but 
still I do fail to receive data on


`SELECT * FROM etl_raw LIMIT 1;` with exception of 
`java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure 
that the field is there in json payload.


With commented out `repo_state` filed I am able to retrieve the data. 
Unfortunately I do not have control over the payload structure as its 
3rd party hook to make it flat.


In general I am unable to parse json msg from pubsub having structured 
field.


Is anyone familiar with this part of Beam functionalities?

Best regards

Wisniowski Piotr





Beam shell sql with zeta

2023-04-20 Thread Wiśniowski Piotr

Hi,

I have a question regarding usage of Zeta with SQL extensions in SQL 
shell. I try to:


```

SET runner = DirectRunner;
SET tempLocation = `/tmp/test/`;
SET streaming=`True`;
SET plannerName = 
`org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner`;


CREATE EXTERNAL TABLE etl_raw(
    event_timestamp TIMESTAMP,
    event_type VARCHAR,
    message_id VARCHAR,
    tracking_source VARCHAR,
    tracking_version VARCHAR,
    `repo_state` STRUCT<`head` STRUCT<`commit` VARCHAR ,`name` VARCHAR>>
)
TYPE pubsub
LOCATION 'projects/xxx/topics/xxx'
TBLPROPERTIES '{"format":"json"}';

```

But get error `parse failed: Encountered "STRUCT" `.

If i change the `STRUCT` to `ROW` (as in Calcite) the DDL passes, but 
still I do fail to receive data on


`SELECT * FROM etl_raw LIMIT 1;` with exception of 
`java.lang.NoSuchFieldException: head (state=,code=0)` when I am sure 
that the field is there in json payload.


With commented out `repo_state` filed I am able to retrieve the data. 
Unfortunately I do not have control over the payload structure as its 
3rd party hook to make it flat.


In general I am unable to parse json msg from pubsub having structured 
field.


Is anyone familiar with this part of Beam functionalities?

Best regards

Wisniowski Piotr





Re: Backup event from Kafka to S3 in parquet format every minute

2023-02-20 Thread Wiśniowski Piotr

Hi Alexey,

I am just learning Beam and doing POC that requires fetching stream data 
from PubSub and partitioning it on gs as parquet files with constant 
window. The thing is I have additional requirement to use ONLY SQL.


I did not manage to do it. My solutions either worked indefinitely or 
failed with `GroupByKey cannot be applied to an unbounded PCollection 
with global windowing and a default trigger` despite having window 
definition in the exact same CTE. Exactly what I tried You can find 
here: https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph


Here my knowledge of Beam ends. My hypothesis: is that `WriteToParquet` 
supports only bounded data and does not automatically partition data. So 
in OP could use it by batching the data in memory (into individual 
windows) and then applying `WriteToParquet` to each collected batch 
individually. But this is more like guess than knowledge. Please let me 
know if this is not correct.


On my solution I cannot test it as I am limited only to pure SQL, where 
I can only play with a table definition. But did not see any table 
parameters that could be responsible for partitioning. If there are 
please let me know.


If You remember that It is possible to read or write to partitioned 
Parquet files as just `PTransform` that's great! I probably must have 
made some minor mistake in my trials. But eager to learn what was the 
mistake.


Best regards

Wiśniowski Piotr



I did find solution like this one: 
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java. 
This probably could help OP since OP tried to save to lvl `PCollection` 
as `Parquet` instead of saving each partition separately like stated in 
`WriteOneFilePer


On 17.02.2023 18:38, Alexey Romanenko wrote:

Piotr,

On 17 Feb 2023, at 09:48, Wiśniowski Piotr 
 wrote:


Does this mean that Parquet IO does not support partitioning, and we 
need to do some workarounds? Like explicitly mapping each window to a 
separate Parquet file?




Could you elaborate a bit more on this? IIRC, we used to read 
partitioned Parquet files with ParquetIO while running TPC-DS benchmark.


—
Alexey


Re: Backup event from Kafka to S3 in parquet format every minute

2023-02-17 Thread Wiśniowski Piotr

Hi,

Sounds like exact problem that I have few emails before - 
https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph


Does this mean that Parquet IO does not support partitioning, and we 
need to do some workarounds? Like explicitly mapping each window to a 
separate Parquet file? This could be a solution in Your case, if it 
works (just idea worth trying but did not test it and do not have enough 
experience with Beam), but I am limited only to pure SQL and not sure 
how I can do it.


Hope This helps with Your problem and Beam support could find some 
solution to my case too.


Best

Wiśniowski Piotr

On 17.02.2023 02:00, Lydian wrote:
I want to make a simple Beam pipeline which will store the events from 
kafka to S3 in parquet format every minute.


Here's a simplified version of my pipeline:

|def add_timestamp(event: Any) -> Any: from datetime import datetime 
from apache_beam import window return window.TimestampedValue(event, 
datetime.timestamp(event[1].timestamp)) # Actual Pipeline ( pipeline | 
"Read from Kafka" >> ReadFromKafka(consumer_config, topics, 
with_metadata=False) | "Transformed" >> beam.Map(my_transform) | "Add 
timestamp" >> beam.Map(add_timestamp) | "window" >> 
beam.WindowInto(window.FixedWindows(60)) # 1 mins | "writing to 
parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema) ) |


However, the pipeline failed with

|GroupByKey cannot be applied to an unbounded PCollection with global 
windowing and a default trigger |


This seems to be coming from 
https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which 
always add a |GlobalWindows| and thus causing this error. Wondering 
what I should do to correctly backup the event from Kafka (Unbounded) 
to S3. Thanks!


btw, I am running with |portableRunner| with Flink. Beam Version is 
2.41.0 (the latest version seems to have the same code) and Flink 
version is 1.14.5




Sincerely,
Lydian Lee


Fwd: Beam SQL Extensions Windowing question/problem

2023-02-13 Thread Wiśniowski Piotr

Hi,

I am quite new to Calcite or ZetaSQL SQL dialects, but I did read all 
the doc I could find and could not make below code work. Any ideas what 
I could be doing wrongly? (Beam 2.44 or latest main builds behave the 
same, DirectRunner)


```

CREATE EXTERNAL TABLE pub_sub_example(
    event_timestamp TIMESTAMP,
    `type` VARCHAR,
    `value` INTEGER
)
TYPE pubsub
LOCATION 'projects/some-project/topics/etl-raw-user-behaviour'
TBLPROPERTIES '{"format":"json"}';

```

Below properly returns results:

```SELECT * FROM pub_sub_example LIMIT 2;```

But below ones works indefinitely

```

SELECT
    `type` AS type_kind,
    TUMBLE_START(event_timestamp, INTERVAL '1' SECOND) AS win,
    COUNT(*) AS count_of_messages
FROM pub_sub_example
GROUP BY TUMBLE(event_timestamp, INTERVAL '1' SECOND), `type`
LIMIT 2;

```

Moreover when I do:

```

CREATE EXTERNAL TABLE pub_sub_hist (
    type_kind VARCHAR,
    win TIMESTAMP,
    count_of_messages INTEGER
)
TYPE parquet
LOCATION 'gs://etl-user-behaviour/output'
TBLPROPERTIES '{"file_name_suffix":".parquet", 
"shard_name_template":"/S_of_N", 
"mime_type":"application/vnd.apache.parquet"}'

;

INSERT INTO
SELECT
    `type` AS type_kind,
    TUMBLE_START(event_timestamp, INTERVAL '1' SECOND) AS win,
    COUNT(*) AS count_of_messages
FROM pub_sub_example
GROUP BY TUMBLE(event_timestamp, INTERVAL '1' SECOND), `type`;

```

I do get error that I cannot understand:

groupByKey cannot be applied to non-bounded PCollection in the 
GlobalWindow without a trigger. Use a Window.into or Window.triggering 
transform prior to GroupByKey. (state=,code=0)


I think that I am providing a window as `TUMBLE(event_timestamp, 
INTERVAL '1' SECOND)` in the exact `GROUP BY` step but it still threats 
the table as in `GlobalWindow`?


Moreover I did find in Calcite SQL docs that this kind of windowing is 
depreciated: 
https://calcite.apache.org/docs/reference.html#grouped-window-functions 
and even DataFlow SQL examples do use new syntax 
(https://cloud.google.com/dataflow/docs/reference/sql/streaming-extensions) 
that is present also in Calcite 
(https://calcite.apache.org/docs/reference.html#tumble) but Beam SQL 
does not parse it. Any idea what is the roadmap for this functionalities?


Really appreciate quick reply.

Best regards

Wiśniowski Piotr