Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-26 Thread Ondřej Pánek
Hello XQ,

thanks for letting me know! I’m looking forward when this functionality will be 
available, as it would be totally helpful for our use-case.

Best regards,

Ondrej

From: XQ Hu via user 
Date: Sunday, March 24, 2024 at 02:43
To: user@beam.apache.org 
Cc: XQ Hu 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
My attempt to fix https://github.com/apache/beam/issues/25598: 
https://github.com/apache/beam/pull/30728

On Thu, Mar 21, 2024 at 10:35 AM Ondřej Pánek 
mailto:ondrej.pa...@bighub.cz>> wrote:
Hello Jan,

thanks a lot for for the detailed answer! So during the last week, the consumer 
changed their requirements on the output to BigQuery. That, as we understand, 
is available even in Beam Python SDK, and we have already a PoC for it.

You have a very good point with the bullet c). That’s indeed our case now. The 
data will really be only transferred from topics on the Kafka MirrorMaker 
cluster (managed by the customer) sitting in the GCP to BigQuery, rather than 
performing some huge Transformations. However, the number of topics is quite 
large (hundreds), and the customer wants to have additional flexibility when 
adding/removing topics so the transfer job will dynamically take the changes.

TBH we also started to think about PySpark Streaming in Dataproc and created 
some PoC there as well. Looks more light weight than Dataflow & Beam for the 
initial runs.

Also, and you definitely will know better, looks like the offset management in 
Beam/Dataflow for streaming is a bit of a “black box” compared to the external 
storage for the offsets in Spark Streaming. The problem I’m having with 
Dataflow now, is that after the job’s removal, the internal state, hence 
offsets, is reset, and one needs to make sure (somehow?) the consumed data is 
not duplicated/lost in case of Dataflow job restart.

Another Kafka cluster with Kafka Connect is not really an option, again based 
on the customer’s requirements. The Data Engineering team wants to have the 
full control on this ingestion solution, and Kafka cluster management is not in 
their scope, what’s more neither is Java in general.

Thanks for the answers and opinions so far!

Best,

Ondřej



From: Jan Lukavský mailto:je...@seznam.cz>>
Date: Thursday, March 14, 2024 at 14:13
To: user@beam.apache.org<mailto:user@beam.apache.org> 
mailto:user@beam.apache.org>>
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python

Self-correction, as you are using a streaming pipeline without final watermark 
emission (I suppose), option (a) will not work. Patching the sink to support 
generic windowing would be probably much more involved.
On 3/14/24 14:07, Jan Lukavský wrote:

Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither python SDK 
nor ParquetIO, so please take these just as a suggestions from the top of my 
head.

First, it seems that the current implementation of WriteToParquet really does 
not play well with streaming pipelines. There are several options that could be 
used to overcome this limitation:

 a) you can try fixing the sink, maybe adding AfterWatermark.pastEndOfWindow() 
trigger might be enough to make it work (need to be tested)

 b) if the Java implementation of ParquetIO works for streaming pipelines (and 
I would suppose it does), you can use cross-language transform to run ParquetIO 
from python, see [1] for quick start

 c) generally speaking, using a full-blown streaming engine for tasks like 
"buffer this and store it in bulk after a timeout" is inefficient. Alternative 
approach would be just to use KafkaConsumer, create parquet files on local 
disk, push them to GCS and commit offsets afterwards. Streaming engines buffer 
data in replicated distributed state which adds unneeded complexity

 d) if there is some non-trivial processing between consuming elements from 
Kafka and writing outputs, then it might be an alternative to process the data 
in streaming pipeline, write outputs back to Kafka and then use approach (c) to 
get it to GCS

The specific solution depends on the actual requirements of your customers.

Best,

 Jan

[1] https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
On 3/14/24 09:34, Ondřej Pánek wrote:
Basically, this is the error we receive when trying to use avro or parquet 
sinks (attached image).
Also, check the sample pipeline that triggers this error (when deploying with 
DataflowRunner). So obviously, there is no global window or default trigger. 
That’s, I believe, what’s described in the issue: 
https://github.com/apache/beam/issues/25598


From: Ondřej Pánek <mailto:ondrej.pa...@bighub.cz>
Date: Thursday, March 14, 2024 at 07:57
To: user@beam.apache.org<mailto:user@beam.apache.org> 
<mailto:user@beam.apache.org>
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Hello, thanks for the reply!

Please, refer to these:


  *   
https://www.googlecloudcom

Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-23 Thread XQ Hu via user
My attempt to fix https://github.com/apache/beam/issues/25598:
https://github.com/apache/beam/pull/30728

On Thu, Mar 21, 2024 at 10:35 AM Ondřej Pánek 
wrote:

> Hello Jan,
>
>
>
> thanks a lot for for the detailed answer! So during the last week, the
> consumer changed their requirements on the output to BigQuery. That, as we
> understand, is available even in Beam Python SDK, and we have already a PoC
> for it.
>
>
>
> You have a very good point with the bullet c). That’s indeed our case now.
> The data will really be only transferred from topics on the Kafka
> MirrorMaker cluster (managed by the customer) sitting in the GCP to
> BigQuery, rather than performing some huge Transformations. However, the
> number of topics is quite large (hundreds), and the customer wants to have
> additional flexibility when adding/removing topics so the transfer job will
> dynamically take the changes.
>
>
>
> TBH we also started to think about PySpark Streaming in Dataproc and
> created some PoC there as well. Looks more light weight than Dataflow &
> Beam for the initial runs.
>
>
>
> Also, and you definitely will know better, looks like the offset
> management in Beam/Dataflow for streaming is a bit of a “black box”
> compared to the external storage for the offsets in Spark Streaming. The
> problem I’m having with Dataflow now, is that after the job’s removal, the
> internal state, hence offsets, is reset, and one needs to make sure
> (somehow?) the consumed data is not duplicated/lost in case of Dataflow job
> restart.
>
>
>
> Another Kafka cluster with Kafka Connect is not really an option, again
> based on the customer’s requirements. The Data Engineering team wants to
> have the full control on this ingestion solution, and Kafka cluster
> management is not in their scope, what’s more neither is Java in general.
>
>
>
> Thanks for the answers and opinions so far!
>
>
>
> Best,
>
>
>
> Ondřej
>
>
>
>
>
>
>
> *From: *Jan Lukavský 
> *Date: *Thursday, March 14, 2024 at 14:13
> *To: *user@beam.apache.org 
> *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
>
> Self-correction, as you are using a streaming pipeline without final
> watermark emission (I suppose), option (a) will not work. Patching the sink
> to support generic windowing would be probably much more involved.
>
> On 3/14/24 14:07, Jan Lukavský wrote:
>
> Hi Ondřej,
>
> I'll start with a disclaimer; I'm not exactly an expert on neither python
> SDK nor ParquetIO, so please take these just as a suggestions from the top
> of my head.
>
> First, it seems that the current implementation of WriteToParquet really
> does not play well with streaming pipelines. There are several options that
> could be used to overcome this limitation:
>
>  a) you can try fixing the sink, maybe adding
> AfterWatermark.pastEndOfWindow() trigger might be enough to make it work
> (need to be tested)
>
>  b) if the Java implementation of ParquetIO works for streaming pipelines
> (and I would suppose it does), you can use cross-language transform to run
> ParquetIO from python, see [1] for quick start
>
>  c) generally speaking, using a full-blown streaming engine for tasks like
> "buffer this and store it in bulk after a timeout" is inefficient.
> Alternative approach would be just to use KafkaConsumer, create parquet
> files on local disk, push them to GCS and commit offsets afterwards.
> Streaming engines buffer data in replicated distributed state which adds
> unneeded complexity
>
>  d) if there is some non-trivial processing between consuming elements
> from Kafka and writing outputs, then it might be an alternative to process
> the data in streaming pipeline, write outputs back to Kafka and then use
> approach (c) to get it to GCS
>
> The specific solution depends on the actual requirements of your customers.
>
> Best,
>
>  Jan
>
> [1]
> https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
>
> On 3/14/24 09:34, Ondřej Pánek wrote:
>
> Basically, this is the error we receive when trying to use avro or parquet
> sinks (attached image).
>
> Also, check the sample pipeline that triggers this error (when deploying
> with DataflowRunner). So obviously, there is no global window or default
> trigger. That’s, I believe, what’s described in the issue:
> https://github.com/apache/beam/issues/25598
>
>
>
>
>
> *From: *Ondřej Pánek  
> *Date: *Thursday, March 14, 2024 at 07:57
> *To: *user@beam.apache.org  
> *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
>
> Hello, thanks for the reply!
>
> Please, refer to these:

Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-21 Thread Ondřej Pánek
Hello Jan,

thanks a lot for for the detailed answer! So during the last week, the consumer 
changed their requirements on the output to BigQuery. That, as we understand, 
is available even in Beam Python SDK, and we have already a PoC for it.

You have a very good point with the bullet c). That’s indeed our case now. The 
data will really be only transferred from topics on the Kafka MirrorMaker 
cluster (managed by the customer) sitting in the GCP to BigQuery, rather than 
performing some huge Transformations. However, the number of topics is quite 
large (hundreds), and the customer wants to have additional flexibility when 
adding/removing topics so the transfer job will dynamically take the changes.

TBH we also started to think about PySpark Streaming in Dataproc and created 
some PoC there as well. Looks more light weight than Dataflow & Beam for the 
initial runs.

Also, and you definitely will know better, looks like the offset management in 
Beam/Dataflow for streaming is a bit of a “black box” compared to the external 
storage for the offsets in Spark Streaming. The problem I’m having with 
Dataflow now, is that after the job’s removal, the internal state, hence 
offsets, is reset, and one needs to make sure (somehow?) the consumed data is 
not duplicated/lost in case of Dataflow job restart.

Another Kafka cluster with Kafka Connect is not really an option, again based 
on the customer’s requirements. The Data Engineering team wants to have the 
full control on this ingestion solution, and Kafka cluster management is not in 
their scope, what’s more neither is Java in general.

Thanks for the answers and opinions so far!

Best,

Ondřej



From: Jan Lukavský 
Date: Thursday, March 14, 2024 at 14:13
To: user@beam.apache.org 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python

Self-correction, as you are using a streaming pipeline without final watermark 
emission (I suppose), option (a) will not work. Patching the sink to support 
generic windowing would be probably much more involved.
On 3/14/24 14:07, Jan Lukavský wrote:

Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither python SDK 
nor ParquetIO, so please take these just as a suggestions from the top of my 
head.

First, it seems that the current implementation of WriteToParquet really does 
not play well with streaming pipelines. There are several options that could be 
used to overcome this limitation:

 a) you can try fixing the sink, maybe adding AfterWatermark.pastEndOfWindow() 
trigger might be enough to make it work (need to be tested)

 b) if the Java implementation of ParquetIO works for streaming pipelines (and 
I would suppose it does), you can use cross-language transform to run ParquetIO 
from python, see [1] for quick start

 c) generally speaking, using a full-blown streaming engine for tasks like 
"buffer this and store it in bulk after a timeout" is inefficient. Alternative 
approach would be just to use KafkaConsumer, create parquet files on local 
disk, push them to GCS and commit offsets afterwards. Streaming engines buffer 
data in replicated distributed state which adds unneeded complexity

 d) if there is some non-trivial processing between consuming elements from 
Kafka and writing outputs, then it might be an alternative to process the data 
in streaming pipeline, write outputs back to Kafka and then use approach (c) to 
get it to GCS

The specific solution depends on the actual requirements of your customers.

Best,

 Jan

[1] https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
On 3/14/24 09:34, Ondřej Pánek wrote:
Basically, this is the error we receive when trying to use avro or parquet 
sinks (attached image).
Also, check the sample pipeline that triggers this error (when deploying with 
DataflowRunner). So obviously, there is no global window or default trigger. 
That’s, I believe, what’s described in the issue: 
https://github.com/apache/beam/issues/25598


From: Ondřej Pánek <mailto:ondrej.pa...@bighub.cz>
Date: Thursday, March 14, 2024 at 07:57
To: user@beam.apache.org<mailto:user@beam.apache.org> 
<mailto:user@beam.apache.org>
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Hello, thanks for the reply!

Please, refer to these:


  *   
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  *   https://github.com/apache/beam/issues/25598

Best,

Ondrej

From: XQ Hu via user <mailto:user@beam.apache.org>
Date: Thursday, March 14, 2024 at 02:32
To: user@beam.apache.org<mailto:user@beam.apache.org> 
<mailto:user@beam.apache.org>
Cc: XQ Hu <mailto:x...@google.com>
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Can you explain more about " that current sinks for Avro and Parquet with the 
destination of GCS are not supported"?

We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/con

Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-14 Thread Jan Lukavský
Self-correction, as you are using a streaming pipeline without final 
watermark emission (I suppose), option (a) will not work. Patching the 
sink to support generic windowing would be probably much more involved.


On 3/14/24 14:07, Jan Lukavský wrote:


Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither 
python SDK nor ParquetIO, so please take these just as a suggestions 
from the top of my head.


First, it seems that the current implementation of WriteToParquet 
really does not play well with streaming pipelines. There are several 
options that could be used to overcome this limitation:


 a) you can try fixing the sink, maybe adding 
AfterWatermark.pastEndOfWindow() trigger might be enough to make it 
work (need to be tested)


 b) if the Java implementation of ParquetIO works for streaming 
pipelines (and I would suppose it does), you can use cross-language 
transform to run ParquetIO from python, see [1] for quick start


 c) generally speaking, using a full-blown streaming engine for tasks 
like "buffer this and store it in bulk after a timeout" is 
inefficient. Alternative approach would be just to use KafkaConsumer, 
create parquet files on local disk, push them to GCS and commit 
offsets afterwards. Streaming engines buffer data in replicated 
distributed state which adds unneeded complexity


 d) if there is some non-trivial processing between consuming elements 
from Kafka and writing outputs, then it might be an alternative to 
process the data in streaming pipeline, write outputs back to Kafka 
and then use approach (c) to get it to GCS


The specific solution depends on the actual requirements of your 
customers.


Best,

 Jan

[1] 
https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/


On 3/14/24 09:34, Ondřej Pánek wrote:


Basically, this is the error we receive when trying to use avro or 
parquet sinks (attached image).


Also, check the sample pipeline that triggers this error (when 
deploying with DataflowRunner). So obviously, there is no global 
window or default trigger. That’s, I believe, what’s described in the 
issue: https://github.com/apache/beam/issues/25598


*From: *Ondřej Pánek 
*Date: *Thursday, March 14, 2024 at 07:57
*To: *user@beam.apache.org 
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Hello, thanks for the reply!

Please, refer to these:

  * 
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  * https://github.com/apache/beam/issues/25598

Best,

Ondrej

*From: *XQ Hu via user 
*Date: *Thursday, March 14, 2024 at 02:32
*To: *user@beam.apache.org 
*Cc: *XQ Hu 
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Can you explain more about " that current sinks for Avro and Parquet 
with the destination of GCS are not supported"?


We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/connectors/) in Python.


On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek  
wrote:


Hello Beam team!

We’re currently onboarding customer’s infrastructure to the
Google Cloud Platform. The decision was made that one of the
technologies they will use is Dataflow. Let me briefly the
usecase specification:

They have kafka cluster where data from CDC data source is
stored. The data in the topics is stored as Avro format. Their
other requirement is they want to have a streaming solution
reading from these Kafka topics, and writing to the Google Cloud
Storage again in Avro. What’s more, the component should be
written in Python, since their Data Engineers heavily prefer
Python instead of Java.

We’ve been struggling with the design of the solution for couple
of weeks now, and we’re facing quite unfortunate situation now,
not really finding any solution that would fit these requirements.

So the question is: Is there any existing Dataflow
template/solution with the following specifications:

  * Streaming connector
  * Written in Python
  * Consumes from Kafka topics
  * Reads Avro with Schema Registry
  * Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the
destination of GCS are not supported for Python at the moment,
which is basically the main blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create
our own custom connector for it. The question in this case would
be if that’s even possible theoretically, since we would really
need to avoid another dead end.

Thanks a lot for any help!

Kind regards,

Ondrej


Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-14 Thread Jan Lukavský

Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither 
python SDK nor ParquetIO, so please take these just as a suggestions 
from the top of my head.


First, it seems that the current implementation of WriteToParquet really 
does not play well with streaming pipelines. There are several options 
that could be used to overcome this limitation:


 a) you can try fixing the sink, maybe adding 
AfterWatermark.pastEndOfWindow() trigger might be enough to make it work 
(need to be tested)


 b) if the Java implementation of ParquetIO works for streaming 
pipelines (and I would suppose it does), you can use cross-language 
transform to run ParquetIO from python, see [1] for quick start


 c) generally speaking, using a full-blown streaming engine for tasks 
like "buffer this and store it in bulk after a timeout" is inefficient. 
Alternative approach would be just to use KafkaConsumer, create parquet 
files on local disk, push them to GCS and commit offsets afterwards. 
Streaming engines buffer data in replicated distributed state which adds 
unneeded complexity


 d) if there is some non-trivial processing between consuming elements 
from Kafka and writing outputs, then it might be an alternative to 
process the data in streaming pipeline, write outputs back to Kafka and 
then use approach (c) to get it to GCS


The specific solution depends on the actual requirements of your customers.

Best,

 Jan

[1] 
https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/


On 3/14/24 09:34, Ondřej Pánek wrote:


Basically, this is the error we receive when trying to use avro or 
parquet sinks (attached image).


Also, check the sample pipeline that triggers this error (when 
deploying with DataflowRunner). So obviously, there is no global 
window or default trigger. That’s, I believe, what’s described in the 
issue: https://github.com/apache/beam/issues/25598


*From: *Ondřej Pánek 
*Date: *Thursday, March 14, 2024 at 07:57
*To: *user@beam.apache.org 
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Hello, thanks for the reply!

Please, refer to these:

  * 
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  * https://github.com/apache/beam/issues/25598

Best,

Ondrej

*From: *XQ Hu via user 
*Date: *Thursday, March 14, 2024 at 02:32
*To: *user@beam.apache.org 
*Cc: *XQ Hu 
*Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python

Can you explain more about " that current sinks for Avro and Parquet 
with the destination of GCS are not supported"?


We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/connectors/) in Python.


On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek  
wrote:


Hello Beam team!

We’re currently onboarding customer’s infrastructure to the Google
Cloud Platform. The decision was made that one of the technologies
they will use is Dataflow. Let me briefly the usecase specification:

They have kafka cluster where data from CDC data source is stored.
The data in the topics is stored as Avro format. Their other
requirement is they want to have a streaming solution reading from
these Kafka topics, and writing to the Google Cloud Storage again
in Avro. What’s more, the component should be written in Python,
since their Data Engineers heavily prefer Python instead of Java.

We’ve been struggling with the design of the solution for couple
of weeks now, and we’re facing quite unfortunate situation now,
not really finding any solution that would fit these requirements.

So the question is: Is there any existing Dataflow
template/solution with the following specifications:

  * Streaming connector
  * Written in Python
  * Consumes from Kafka topics
  * Reads Avro with Schema Registry
  * Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the
destination of GCS are not supported for Python at the moment,
which is basically the main blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create our
own custom connector for it. The question in this case would be if
that’s even possible theoretically, since we would really need to
avoid another dead end.

Thanks a lot for any help!

Kind regards,

Ondrej


Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-14 Thread Ondřej Pánek
Basically, this is the error we receive when trying to use avro or parquet 
sinks (attached image).
Also, check the sample pipeline that triggers this error (when deploying with 
DataflowRunner). So obviously, there is no global window or default trigger. 
That’s, I believe, what’s described in the issue: 
https://github.com/apache/beam/issues/25598


From: Ondřej Pánek 
Date: Thursday, March 14, 2024 at 07:57
To: user@beam.apache.org 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Hello, thanks for the reply!

Please, refer to these:


  *   
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  *   https://github.com/apache/beam/issues/25598

Best,

Ondrej

From: XQ Hu via user 
Date: Thursday, March 14, 2024 at 02:32
To: user@beam.apache.org 
Cc: XQ Hu 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Can you explain more about " that current sinks for Avro and Parquet with the 
destination of GCS are not supported"?

We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/connectors/) in Python.

On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek 
mailto:ondrej.pa...@bighub.cz>> wrote:
Hello Beam team!

We’re currently onboarding customer’s infrastructure to the Google Cloud 
Platform. The decision was made that one of the technologies they will use is 
Dataflow. Let me briefly the usecase specification:
They have kafka cluster where data from CDC data source is stored. The data in 
the topics is stored as Avro format. Their other requirement is they want to 
have a streaming solution reading from these Kafka topics, and writing to the 
Google Cloud Storage again in Avro. What’s more, the component should be 
written in Python, since their Data Engineers heavily prefer Python instead of 
Java.

We’ve been struggling with the design of the solution for couple of weeks now, 
and we’re facing quite unfortunate situation now, not really finding any 
solution that would fit these requirements.

So the question is: Is there any existing Dataflow template/solution with the 
following specifications:

  *   Streaming connector
  *   Written in Python
  *   Consumes from Kafka topics
  *   Reads Avro with Schema Registry
  *   Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the destination of 
GCS are not supported for Python at the moment, which is basically the main 
blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create our own custom 
connector for it. The question in this case would be if that’s even possible 
theoretically, since we would really need to avoid another dead end.

Thanks a lot for any help!

Kind regards,
Ondrej
with beam.Pipeline(options=options) as pipeline:
# Read messages from Kafka
# TODO: Maybe it would be possible to transform the unbounded 
pcollection to bounded one somehow?
kafka_messages = (
pipeline
| 'Read from Kafka' >> 
ReadFromKafka(consumer_config={'bootstrap.servers': kafka_bootstrap_servers, 
'auto.offset.reset': 'earliest'}, topics=[kafka_topic])
| 'Log messages before PT' >> beam.Map(log_element)
| 'Convert to string' >> beam.ParDo(Transformer())
| 'Fixed window 5s' >> beam.WindowInto(
window.FixedWindows(5),
trigger=Repeatedly(AfterProcessingTime(5)),
accumulation_mode=AccumulationMode.DISCARDING,
)
| 'Log messages after PT' >> beam.Map(log_element)
| 'Write to files' >> WriteToParquet(
file_path_prefix=gcs_output_path,
schema=pyarrow.schema([('content', pyarrow.string())])
)

)

Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-14 Thread Ondřej Pánek
Hello, thanks for the reply!

Please, refer to these:


  *   
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  *   https://github.com/apache/beam/issues/25598

Best,

Ondrej

From: XQ Hu via user 
Date: Thursday, March 14, 2024 at 02:32
To: user@beam.apache.org 
Cc: XQ Hu 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Can you explain more about " that current sinks for Avro and Parquet with the 
destination of GCS are not supported"?

We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/connectors/) in Python.

On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek 
mailto:ondrej.pa...@bighub.cz>> wrote:
Hello Beam team!

We’re currently onboarding customer’s infrastructure to the Google Cloud 
Platform. The decision was made that one of the technologies they will use is 
Dataflow. Let me briefly the usecase specification:
They have kafka cluster where data from CDC data source is stored. The data in 
the topics is stored as Avro format. Their other requirement is they want to 
have a streaming solution reading from these Kafka topics, and writing to the 
Google Cloud Storage again in Avro. What’s more, the component should be 
written in Python, since their Data Engineers heavily prefer Python instead of 
Java.

We’ve been struggling with the design of the solution for couple of weeks now, 
and we’re facing quite unfortunate situation now, not really finding any 
solution that would fit these requirements.

So the question is: Is there any existing Dataflow template/solution with the 
following specifications:

  *   Streaming connector
  *   Written in Python
  *   Consumes from Kafka topics
  *   Reads Avro with Schema Registry
  *   Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the destination of 
GCS are not supported for Python at the moment, which is basically the main 
blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create our own custom 
connector for it. The question in this case would be if that’s even possible 
theoretically, since we would really need to avoid another dead end.

Thanks a lot for any help!

Kind regards,
Ondrej


Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-13 Thread XQ Hu via user
Can you explain more about " that current sinks for Avro and Parquet with
the destination of GCS are not supported"?

We do have AvroIO and ParquetIO (
https://beam.apache.org/documentation/io/connectors/) in Python.

On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek  wrote:

> Hello Beam team!
>
>
>
> We’re currently onboarding customer’s infrastructure to the Google Cloud
> Platform. The decision was made that one of the technologies they will use
> is Dataflow. Let me briefly the usecase specification:
>
> They have kafka cluster where data from CDC data source is stored. The
> data in the topics is stored as Avro format. Their other requirement is
> they want to have a streaming solution reading from these Kafka topics, and
> writing to the Google Cloud Storage again in Avro. What’s more, the
> component should be written in Python, since their Data Engineers heavily
> prefer Python instead of Java.
>
>
>
> We’ve been struggling with the design of the solution for couple of weeks
> now, and we’re facing quite unfortunate situation now, not really finding
> any solution that would fit these requirements.
>
>
>
> So the question is: Is there any existing Dataflow template/solution with
> the following specifications:
>
>- Streaming connector
>- Written in Python
>- Consumes from Kafka topics
>- Reads Avro with Schema Registry
>- Writes Avro to GCS
>
>
>
> We found out, that current sinks for Avro and Parquet with the destination
> of GCS are not supported for Python at the moment, which is basically the
> main blocker now.
>
>
>
> Any recommendations/suggestions would be really highly appreciated!
>
>
>
> Maybe the solution really does not exist and we need to create our own
> custom connector for it. The question in this case would be if that’s even
> possible theoretically, since we would really need to avoid another dead
> end.
>
>
>
> Thanks a lot for any help!
>
>
>
> Kind regards,
>
> Ondrej
>