Self-correction, as you are using a streaming pipeline without final watermark emission (I suppose), option (a) will not work. Patching the sink to support generic windowing would be probably much more involved.

On 3/14/24 14:07, Jan Lukavský wrote:

Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither python SDK nor ParquetIO, so please take these just as a suggestions from the top of my head.

First, it seems that the current implementation of WriteToParquet really does not play well with streaming pipelines. There are several options that could be used to overcome this limitation:

 a) you can try fixing the sink, maybe adding AfterWatermark.pastEndOfWindow() trigger might be enough to make it work (need to be tested)

 b) if the Java implementation of ParquetIO works for streaming pipelines (and I would suppose it does), you can use cross-language transform to run ParquetIO from python, see [1] for quick start

 c) generally speaking, using a full-blown streaming engine for tasks like "buffer this and store it in bulk after a timeout" is inefficient. Alternative approach would be just to use KafkaConsumer, create parquet files on local disk, push them to GCS and commit offsets afterwards. Streaming engines buffer data in replicated distributed state which adds unneeded complexity

 d) if there is some non-trivial processing between consuming elements from Kafka and writing outputs, then it might be an alternative to process the data in streaming pipeline, write outputs back to Kafka and then use approach (c) to get it to GCS

The specific solution depends on the actual requirements of your customers.

Best,

 Jan

[1] https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/

On 3/14/24 09:34, Ondřej Pánek wrote:

Basically, this is the error we receive when trying to use avro or parquet sinks (attached image).

Also, check the sample pipeline that triggers this error (when deploying with DataflowRunner). So obviously, there is no global window or default trigger. That’s, I believe, what’s described in the issue: https://github.com/apache/beam/issues/25598

*From: *Ondřej Pánek <ondrej.pa...@bighub.cz>
*Date: *Thursday, March 14, 2024 at 07:57
*To: *user@beam.apache.org <user@beam.apache.org>
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Hello, thanks for the reply!

Please, refer to these:

  * 
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  * https://github.com/apache/beam/issues/25598

Best,

Ondrej

*From: *XQ Hu via user <user@beam.apache.org>
*Date: *Thursday, March 14, 2024 at 02:32
*To: *user@beam.apache.org <user@beam.apache.org>
*Cc: *XQ Hu <x...@google.com>
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Can you explain more about " that current sinks for Avro and Parquet with the destination of GCS are not supported"?

We do have AvroIO and ParquetIO (https://beam.apache.org/documentation/io/connectors/) in Python.

On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek <ondrej.pa...@bighub.cz> wrote:

    Hello Beam team!

    We’re currently onboarding customer’s infrastructure to the
    Google Cloud Platform. The decision was made that one of the
    technologies they will use is Dataflow. Let me briefly the
    usecase specification:

    They have kafka cluster where data from CDC data source is
    stored. The data in the topics is stored as Avro format. Their
    other requirement is they want to have a streaming solution
    reading from these Kafka topics, and writing to the Google Cloud
    Storage again in Avro. What’s more, the component should be
    written in Python, since their Data Engineers heavily prefer
    Python instead of Java.

    We’ve been struggling with the design of the solution for couple
    of weeks now, and we’re facing quite unfortunate situation now,
    not really finding any solution that would fit these requirements.

    So the question is: Is there any existing Dataflow
    template/solution with the following specifications:

      * Streaming connector
      * Written in Python
      * Consumes from Kafka topics
      * Reads Avro with Schema Registry
      * Writes Avro to GCS

    We found out, that current sinks for Avro and Parquet with the
    destination of GCS are not supported for Python at the moment,
    which is basically the main blocker now.

    Any recommendations/suggestions would be really highly appreciated!

    Maybe the solution really does not exist and we need to create
    our own custom connector for it. The question in this case would
    be if that’s even possible theoretically, since we would really
    need to avoid another dead end.

    Thanks a lot for any help!

    Kind regards,

    Ondrej

Reply via email to