Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-26 Thread Ondřej Pánek
Hello XQ,

thanks for letting me know! I’m looking forward when this functionality will be 
available, as it would be totally helpful for our use-case.

Best regards,

Ondrej

From: XQ Hu via user 
Date: Sunday, March 24, 2024 at 02:43
To: user@beam.apache.org 
Cc: XQ Hu 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
My attempt to fix https://github.com/apache/beam/issues/25598: 
https://github.com/apache/beam/pull/30728

On Thu, Mar 21, 2024 at 10:35 AM Ondřej Pánek 
mailto:ondrej.pa...@bighub.cz>> wrote:
Hello Jan,

thanks a lot for for the detailed answer! So during the last week, the consumer 
changed their requirements on the output to BigQuery. That, as we understand, 
is available even in Beam Python SDK, and we have already a PoC for it.

You have a very good point with the bullet c). That’s indeed our case now. The 
data will really be only transferred from topics on the Kafka MirrorMaker 
cluster (managed by the customer) sitting in the GCP to BigQuery, rather than 
performing some huge Transformations. However, the number of topics is quite 
large (hundreds), and the customer wants to have additional flexibility when 
adding/removing topics so the transfer job will dynamically take the changes.

TBH we also started to think about PySpark Streaming in Dataproc and created 
some PoC there as well. Looks more light weight than Dataflow & Beam for the 
initial runs.

Also, and you definitely will know better, looks like the offset management in 
Beam/Dataflow for streaming is a bit of a “black box” compared to the external 
storage for the offsets in Spark Streaming. The problem I’m having with 
Dataflow now, is that after the job’s removal, the internal state, hence 
offsets, is reset, and one needs to make sure (somehow?) the consumed data is 
not duplicated/lost in case of Dataflow job restart.

Another Kafka cluster with Kafka Connect is not really an option, again based 
on the customer’s requirements. The Data Engineering team wants to have the 
full control on this ingestion solution, and Kafka cluster management is not in 
their scope, what’s more neither is Java in general.

Thanks for the answers and opinions so far!

Best,

Ondřej



From: Jan Lukavský mailto:je...@seznam.cz>>
Date: Thursday, March 14, 2024 at 14:13
To: user@beam.apache.org<mailto:user@beam.apache.org> 
mailto:user@beam.apache.org>>
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python

Self-correction, as you are using a streaming pipeline without final watermark 
emission (I suppose), option (a) will not work. Patching the sink to support 
generic windowing would be probably much more involved.
On 3/14/24 14:07, Jan Lukavský wrote:

Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither python SDK 
nor ParquetIO, so please take these just as a suggestions from the top of my 
head.

First, it seems that the current implementation of WriteToParquet really does 
not play well with streaming pipelines. There are several options that could be 
used to overcome this limitation:

 a) you can try fixing the sink, maybe adding AfterWatermark.pastEndOfWindow() 
trigger might be enough to make it work (need to be tested)

 b) if the Java implementation of ParquetIO works for streaming pipelines (and 
I would suppose it does), you can use cross-language transform to run ParquetIO 
from python, see [1] for quick start

 c) generally speaking, using a full-blown streaming engine for tasks like 
"buffer this and store it in bulk after a timeout" is inefficient. Alternative 
approach would be just to use KafkaConsumer, create parquet files on local 
disk, push them to GCS and commit offsets afterwards. Streaming engines buffer 
data in replicated distributed state which adds unneeded complexity

 d) if there is some non-trivial processing between consuming elements from 
Kafka and writing outputs, then it might be an alternative to process the data 
in streaming pipeline, write outputs back to Kafka and then use approach (c) to 
get it to GCS

The specific solution depends on the actual requirements of your customers.

Best,

 Jan

[1] https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
On 3/14/24 09:34, Ondřej Pánek wrote:
Basically, this is the error we receive when trying to use avro or parquet 
sinks (attached image).
Also, check the sample pipeline that triggers this error (when deploying with 
DataflowRunner). So obviously, there is no global window or default trigger. 
That’s, I believe, what’s described in the issue: 
https://github.com/apache/beam/issues/25598


From: Ondřej Pánek <mailto:ondrej.pa...@bighub.cz>
Date: Thursday, March 14, 2024 at 07:57
To: user@beam.apache.org<mailto:user@beam.apache.org> 
<mailto:user@beam.apache.org>
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Hello, thanks for the reply!

Please, refer to these:


  *   
https://www.googlecloudcom

Offset access in Kafka messages in Python

2024-03-26 Thread Ondřej Pánek
Hello team,

Is it possible to somehow retrieve metadata like topic, partition and offset 
information from the consumed messages from Kafka source? I mean, if that’s 
possible to do so in Python. I understand, that in Java, there is the 
KafkaRecord construct, which offers these metadata, but I haven’t found any 
alternative in Python.

The reasoning is that we need to consume CDC data, where the offset indicates 
the message’s order, i.e. the order of the CDC operation, and we need to store 
it to process it later in BigQuery.

Best regards,

Ondrej


Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-21 Thread Ondřej Pánek
Hello Jan,

thanks a lot for for the detailed answer! So during the last week, the consumer 
changed their requirements on the output to BigQuery. That, as we understand, 
is available even in Beam Python SDK, and we have already a PoC for it.

You have a very good point with the bullet c). That’s indeed our case now. The 
data will really be only transferred from topics on the Kafka MirrorMaker 
cluster (managed by the customer) sitting in the GCP to BigQuery, rather than 
performing some huge Transformations. However, the number of topics is quite 
large (hundreds), and the customer wants to have additional flexibility when 
adding/removing topics so the transfer job will dynamically take the changes.

TBH we also started to think about PySpark Streaming in Dataproc and created 
some PoC there as well. Looks more light weight than Dataflow & Beam for the 
initial runs.

Also, and you definitely will know better, looks like the offset management in 
Beam/Dataflow for streaming is a bit of a “black box” compared to the external 
storage for the offsets in Spark Streaming. The problem I’m having with 
Dataflow now, is that after the job’s removal, the internal state, hence 
offsets, is reset, and one needs to make sure (somehow?) the consumed data is 
not duplicated/lost in case of Dataflow job restart.

Another Kafka cluster with Kafka Connect is not really an option, again based 
on the customer’s requirements. The Data Engineering team wants to have the 
full control on this ingestion solution, and Kafka cluster management is not in 
their scope, what’s more neither is Java in general.

Thanks for the answers and opinions so far!

Best,

Ondřej



From: Jan Lukavský 
Date: Thursday, March 14, 2024 at 14:13
To: user@beam.apache.org 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python

Self-correction, as you are using a streaming pipeline without final watermark 
emission (I suppose), option (a) will not work. Patching the sink to support 
generic windowing would be probably much more involved.
On 3/14/24 14:07, Jan Lukavský wrote:

Hi Ondřej,

I'll start with a disclaimer; I'm not exactly an expert on neither python SDK 
nor ParquetIO, so please take these just as a suggestions from the top of my 
head.

First, it seems that the current implementation of WriteToParquet really does 
not play well with streaming pipelines. There are several options that could be 
used to overcome this limitation:

 a) you can try fixing the sink, maybe adding AfterWatermark.pastEndOfWindow() 
trigger might be enough to make it work (need to be tested)

 b) if the Java implementation of ParquetIO works for streaming pipelines (and 
I would suppose it does), you can use cross-language transform to run ParquetIO 
from python, see [1] for quick start

 c) generally speaking, using a full-blown streaming engine for tasks like 
"buffer this and store it in bulk after a timeout" is inefficient. Alternative 
approach would be just to use KafkaConsumer, create parquet files on local 
disk, push them to GCS and commit offsets afterwards. Streaming engines buffer 
data in replicated distributed state which adds unneeded complexity

 d) if there is some non-trivial processing between consuming elements from 
Kafka and writing outputs, then it might be an alternative to process the data 
in streaming pipeline, write outputs back to Kafka and then use approach (c) to 
get it to GCS

The specific solution depends on the actual requirements of your customers.

Best,

 Jan

[1] https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
On 3/14/24 09:34, Ondřej Pánek wrote:
Basically, this is the error we receive when trying to use avro or parquet 
sinks (attached image).
Also, check the sample pipeline that triggers this error (when deploying with 
DataflowRunner). So obviously, there is no global window or default trigger. 
That’s, I believe, what’s described in the issue: 
https://github.com/apache/beam/issues/25598


From: Ondřej Pánek <mailto:ondrej.pa...@bighub.cz>
Date: Thursday, March 14, 2024 at 07:57
To: user@beam.apache.org<mailto:user@beam.apache.org> 
<mailto:user@beam.apache.org>
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Hello, thanks for the reply!

Please, refer to these:


  *   
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  *   https://github.com/apache/beam/issues/25598

Best,

Ondrej

From: XQ Hu via user <mailto:user@beam.apache.org>
Date: Thursday, March 14, 2024 at 02:32
To: user@beam.apache.org<mailto:user@beam.apache.org> 
<mailto:user@beam.apache.org>
Cc: XQ Hu <mailto:x...@google.com>
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Can you explain more about " that current sinks for Avro and Parquet with the 
destination of GCS are not supported"?

We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/con

Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-14 Thread Ondřej Pánek
Basically, this is the error we receive when trying to use avro or parquet 
sinks (attached image).
Also, check the sample pipeline that triggers this error (when deploying with 
DataflowRunner). So obviously, there is no global window or default trigger. 
That’s, I believe, what’s described in the issue: 
https://github.com/apache/beam/issues/25598


From: Ondřej Pánek 
Date: Thursday, March 14, 2024 at 07:57
To: user@beam.apache.org 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Hello, thanks for the reply!

Please, refer to these:


  *   
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  *   https://github.com/apache/beam/issues/25598

Best,

Ondrej

From: XQ Hu via user 
Date: Thursday, March 14, 2024 at 02:32
To: user@beam.apache.org 
Cc: XQ Hu 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Can you explain more about " that current sinks for Avro and Parquet with the 
destination of GCS are not supported"?

We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/connectors/) in Python.

On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek 
mailto:ondrej.pa...@bighub.cz>> wrote:
Hello Beam team!

We’re currently onboarding customer’s infrastructure to the Google Cloud 
Platform. The decision was made that one of the technologies they will use is 
Dataflow. Let me briefly the usecase specification:
They have kafka cluster where data from CDC data source is stored. The data in 
the topics is stored as Avro format. Their other requirement is they want to 
have a streaming solution reading from these Kafka topics, and writing to the 
Google Cloud Storage again in Avro. What’s more, the component should be 
written in Python, since their Data Engineers heavily prefer Python instead of 
Java.

We’ve been struggling with the design of the solution for couple of weeks now, 
and we’re facing quite unfortunate situation now, not really finding any 
solution that would fit these requirements.

So the question is: Is there any existing Dataflow template/solution with the 
following specifications:

  *   Streaming connector
  *   Written in Python
  *   Consumes from Kafka topics
  *   Reads Avro with Schema Registry
  *   Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the destination of 
GCS are not supported for Python at the moment, which is basically the main 
blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create our own custom 
connector for it. The question in this case would be if that’s even possible 
theoretically, since we would really need to avoid another dead end.

Thanks a lot for any help!

Kind regards,
Ondrej
with beam.Pipeline(options=options) as pipeline:
# Read messages from Kafka
# TODO: Maybe it would be possible to transform the unbounded 
pcollection to bounded one somehow?
kafka_messages = (
pipeline
| 'Read from Kafka' >> 
ReadFromKafka(consumer_config={'bootstrap.servers': kafka_bootstrap_servers, 
'auto.offset.reset': 'earliest'}, topics=[kafka_topic])
| 'Log messages before PT' >> beam.Map(log_element)
| 'Convert to string' >> beam.ParDo(Transformer())
| 'Fixed window 5s' >> beam.WindowInto(
window.FixedWindows(5),
trigger=Repeatedly(AfterProcessingTime(5)),
accumulation_mode=AccumulationMode.DISCARDING,
)
| 'Log messages after PT' >> beam.Map(log_element)
| 'Write to files' >> WriteToParquet(
file_path_prefix=gcs_output_path,
schema=pyarrow.schema([('content', pyarrow.string())])
)

)

Re: Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-14 Thread Ondřej Pánek
Hello, thanks for the reply!

Please, refer to these:


  *   
https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
  *   https://github.com/apache/beam/issues/25598

Best,

Ondrej

From: XQ Hu via user 
Date: Thursday, March 14, 2024 at 02:32
To: user@beam.apache.org 
Cc: XQ Hu 
Subject: Re: Specific use-case question - Kafka-to-GCS-avro-Python
Can you explain more about " that current sinks for Avro and Parquet with the 
destination of GCS are not supported"?

We do have AvroIO and ParquetIO 
(https://beam.apache.org/documentation/io/connectors/) in Python.

On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek 
mailto:ondrej.pa...@bighub.cz>> wrote:
Hello Beam team!

We’re currently onboarding customer’s infrastructure to the Google Cloud 
Platform. The decision was made that one of the technologies they will use is 
Dataflow. Let me briefly the usecase specification:
They have kafka cluster where data from CDC data source is stored. The data in 
the topics is stored as Avro format. Their other requirement is they want to 
have a streaming solution reading from these Kafka topics, and writing to the 
Google Cloud Storage again in Avro. What’s more, the component should be 
written in Python, since their Data Engineers heavily prefer Python instead of 
Java.

We’ve been struggling with the design of the solution for couple of weeks now, 
and we’re facing quite unfortunate situation now, not really finding any 
solution that would fit these requirements.

So the question is: Is there any existing Dataflow template/solution with the 
following specifications:

  *   Streaming connector
  *   Written in Python
  *   Consumes from Kafka topics
  *   Reads Avro with Schema Registry
  *   Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the destination of 
GCS are not supported for Python at the moment, which is basically the main 
blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create our own custom 
connector for it. The question in this case would be if that’s even possible 
theoretically, since we would really need to avoid another dead end.

Thanks a lot for any help!

Kind regards,
Ondrej


Specific use-case question - Kafka-to-GCS-avro-Python

2024-03-13 Thread Ondřej Pánek
Hello Beam team!

We’re currently onboarding customer’s infrastructure to the Google Cloud 
Platform. The decision was made that one of the technologies they will use is 
Dataflow. Let me briefly the usecase specification:
They have kafka cluster where data from CDC data source is stored. The data in 
the topics is stored as Avro format. Their other requirement is they want to 
have a streaming solution reading from these Kafka topics, and writing to the 
Google Cloud Storage again in Avro. What’s more, the component should be 
written in Python, since their Data Engineers heavily prefer Python instead of 
Java.

We’ve been struggling with the design of the solution for couple of weeks now, 
and we’re facing quite unfortunate situation now, not really finding any 
solution that would fit these requirements.

So the question is: Is there any existing Dataflow template/solution with the 
following specifications:

  *   Streaming connector
  *   Written in Python
  *   Consumes from Kafka topics
  *   Reads Avro with Schema Registry
  *   Writes Avro to GCS

We found out, that current sinks for Avro and Parquet with the destination of 
GCS are not supported for Python at the moment, which is basically the main 
blocker now.

Any recommendations/suggestions would be really highly appreciated!

Maybe the solution really does not exist and we need to create our own custom 
connector for it. The question in this case would be if that’s even possible 
theoretically, since we would really need to avoid another dead end.

Thanks a lot for any help!

Kind regards,
Ondrej