Yes, I think so~

On Thu, Apr 28, 2022 at 11:00 AM lan tran <indigoblue7...@gmail.com> wrote:

> Hi Dian,
>
> Sorry for missing your mail, so if I did as your suggestion and the Flink
> somehow crashed and we have to restart the service, does the Flink job know
> the offset where does it read from Kafka ?
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <dian0511...@gmail.com>
> *Sent: *Tuesday, April 26, 2022 7:54 AM
> *To: *lan tran <indigoblue7...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> The same code in my last reply showed how to set the UID for the source
> operator generated using Table API. I meant that you could firstly create a
> source using Table API, then convert it to a DataStream API and set uid for
> the source operator using the same code above, then perform operations with
> DataStream API.
>
> Regards,
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 9:27 PM lan tran <indigoblue7...@gmail.com> wrote:
>
> Hi Dian,
>
> Thank again for fast response.
>
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).
>
> However, at the first phase which is collecting the data from Kafka (
> having Debezium format), the UID cannot be set since we are using Table API
> (auto generate the UID).
>
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.
>
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema
> and DebeziumJsonRowDeserializationSchema in the DataStream job to be able
> to use the Savepoint for the whole full flow.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <dian0511...@gmail.com>
> *Sent: *Monday, April 25, 2022 7:46 PM
> *To: *lan tran <indigoblue7...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> You could try the following code (also it may be a little hacky):
> ```
>
> def set_uid_for_source(ds: DataStream, uid: str):
>
> transformation = ds._j_data_stream.getTransformation()
>
>
>
> source_transformation = transformation
>
> while not source_transformation.getInputs().isEmpty():
>
> source_transformation = source_transformation.getInputs().get(0)
>
>
>
> source_transformation.setUid(uid)
>
> ```
>
> Besides, could you describe your use case a bit and also how you want to
> use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.
>
> Regards
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 12:31 PM lan tran <indigoblue7...@gmail.com>
> wrote:
>
> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <dian0511...@gmail.com>
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran <indigoblue7...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in
> Python DataStream API.
>
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
> the results type is RowData instead of Row and so it should be not that
> easy to be directly supported in Python DataStream API. However, it
> supports conversion between Table API & DataStream API[1]. Could you
> firstly create a Table which consumes data from kafka and then convert it
> to a DataStream API?
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors
>
>
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu <dian0511...@gmail.com> wrote:
>
> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
>
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran <indigoblue7...@gmail.com>
> wrote:
>
> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *Dian Fu <dian0511...@gmail.com>
> *Sent: *Friday, April 22, 2022 9:36 PM
> *To: *lan tran <indigoblue7...@gmail.com>
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> I have added an example on how to use AvroRowDeserializationSchema in
> Python DataStream API in [1]. Please take a look at if that helps for you~
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>
>
>
> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu <dian0511...@gmail.com> wrote:
>
> Hi Quynh,
>
> Could you show some sample code on how you use it?
>
> Regards,
> Dian
>
>
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran <indigoblue7...@gmail.com> wrote:
>
> Wonder if this is a bug or not but if I use
> *AvroRowDeserializationSchema,*
>
> In PyFlink the error still occure ?
>
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>
> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
> org.apache.avro.Schema$RecordSchema]) does not exist
>
> Therefore, please help check. Thanks
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
> *From: *lan tran <indigoblue7...@gmail.com>
> *Sent: *Thursday, April 21, 2022 1:43 PM
> *To: *user@flink.apache.org
> *Subject: *AvroRowDeserializationSchema
>
>
>
> Hi team,
>
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).
>
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this
>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
> in TableAPI ?
>
> Best,
> Quynh.
>
> Sent from Mail <https://go.microsoft.com/fwlink/?LinkId=550986> for
> Windows
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to