Self-correction, as you are using a streaming pipeline without final
watermark emission (I suppose), option (a) will not work. Patching the
sink to support generic windowing would be probably much more involved.
On 3/14/24 14:07, Jan Lukavský wrote:
Hi Ondřej,
I'll start with a disclaimer; I'm not exactly an expert on neither
python SDK nor ParquetIO, so please take these just as a suggestions
from the top of my head.
First, it seems that the current implementation of WriteToParquet
really does not play well with streaming pipelines. There are several
options that could be used to overcome this limitation:
a) you can try fixing the sink, maybe adding
AfterWatermark.pastEndOfWindow() trigger might be enough to make it
work (need to be tested)
b) if the Java implementation of ParquetIO works for streaming
pipelines (and I would suppose it does), you can use cross-language
transform to run ParquetIO from python, see [1] for quick start
c) generally speaking, using a full-blown streaming engine for tasks
like "buffer this and store it in bulk after a timeout" is
inefficient. Alternative approach would be just to use KafkaConsumer,
create parquet files on local disk, push them to GCS and commit
offsets afterwards. Streaming engines buffer data in replicated
distributed state which adds unneeded complexity
d) if there is some non-trivial processing between consuming elements
from Kafka and writing outputs, then it might be an alternative to
process the data in streaming pipeline, write outputs back to Kafka
and then use approach (c) to get it to GCS
The specific solution depends on the actual requirements of your
customers.
Best,
Jan
[1]
https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
On 3/14/24 09:34, Ondřej Pánek wrote:
Basically, this is the error we receive when trying to use avro or
parquet sinks (attached image).
Also, check the sample pipeline that triggers this error (when
deploying with DataflowRunner). So obviously, there is no global
window or default trigger. That’s, I believe, what’s described in the
issue: https://github.com/apache/beam/issues/25598
*From: *Ondřej Pánek <ondrej.pa...@bighub.cz>
*Date: *Thursday, March 14, 2024 at 07:57
*To: *user@beam.apache.org <user@beam.apache.org>
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
Hello, thanks for the reply!
Please, refer to these:
*
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
* https://github.com/apache/beam/issues/25598
Best,
Ondrej
*From: *XQ Hu via user <user@beam.apache.org>
*Date: *Thursday, March 14, 2024 at 02:32
*To: *user@beam.apache.org <user@beam.apache.org>
*Cc: *XQ Hu <x...@google.com>
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
Can you explain more about " that current sinks for Avro and Parquet
with the destination of GCS are not supported"?
We do have AvroIO and ParquetIO
(https://beam.apache.org/documentation/io/connectors/) in Python.
On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek <ondrej.pa...@bighub.cz>
wrote:
Hello Beam team!
We’re currently onboarding customer’s infrastructure to the
Google Cloud Platform. The decision was made that one of the
technologies they will use is Dataflow. Let me briefly the
usecase specification:
They have kafka cluster where data from CDC data source is
stored. The data in the topics is stored as Avro format. Their
other requirement is they want to have a streaming solution
reading from these Kafka topics, and writing to the Google Cloud
Storage again in Avro. What’s more, the component should be
written in Python, since their Data Engineers heavily prefer
Python instead of Java.
We’ve been struggling with the design of the solution for couple
of weeks now, and we’re facing quite unfortunate situation now,
not really finding any solution that would fit these requirements.
So the question is: Is there any existing Dataflow
template/solution with the following specifications:
* Streaming connector
* Written in Python
* Consumes from Kafka topics
* Reads Avro with Schema Registry
* Writes Avro to GCS
We found out, that current sinks for Avro and Parquet with the
destination of GCS are not supported for Python at the moment,
which is basically the main blocker now.
Any recommendations/suggestions would be really highly appreciated!
Maybe the solution really does not exist and we need to create
our own custom connector for it. The question in this case would
be if that’s even possible theoretically, since we would really
need to avoid another dead end.
Thanks a lot for any help!
Kind regards,
Ondrej