Hi Thomas,

I have reviewed the code and just
noticed that heartbeat.action.query is not mandatory. Debezium will
generate Heartbeat Events at regular intervals. Flink CDC will then
receive these Heartbeat Events and advance the offset[1]. Finally, the
source
reader
will commit the offset during checkpointing in the streaming phase[2].

Therefore, you may want to verify whether checkpointing is enabled and
if the process has entered the streaming phase (indicating that it is
only reading the WAL log).

[1]
https://github.com/apache/flink-cdc/blob/d386c7c1c2db3bb8590be6c20198286cf0567c97/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java#L119

[2]
https://github.com/apache/flink-cdc/blob/d386c7c1c2db3bb8590be6c20198286cf0567c97/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReaderWithCommit.java#L93

On Sat, May 18, 2024 at 12:34 AM Thomas Peyric <thomas.pey...@asklocala.com>
wrote:

> thanks Hongshun for your response !
>
> Le ven. 17 mai 2024 à 07:51, Hongshun Wang <loserwang1...@gmail.com> a
> écrit :
>
>> Hi Thomas,
>>
>> In debezium dos says: For the connector to detect and process events from
>> a heartbeat table, you must add the table to the PostgreSQL publication
>> specified by the publication.name
>> <https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-publication-name>
>>  property.
>> If this publication predates your Debezium deployment, the connector uses
>> the publications as defined. If the publication is not already configured
>> to automatically replicate changes FOR ALL TABLES in the database, you
>> must explicitly add the heartbeat table to the publication[2].
>>
>> Thus, if you want use heart beat in cdc:
>>
>>    1. add a heartbeat table to publication: ALTER PUBLICATION
>>    *<publicationName>* ADD TABLE *<heartbeatTableName>*;
>>    2. set heartbeatInterval
>>    3. add debezium.heartbeat.action.query
>>    
>> <https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query>
>>     [3]
>>
>> However, when I use it it CDC, some exception occurs:
>>
>> Caused by: java.lang.NullPointerException
>> at 
>> io.debezium.heartbeat.HeartbeatFactory.createHeartbeat(HeartbeatFactory.java:55)
>> at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:127)
>> at io.debezium.pipeline.EventDispatcher.<init>(EventDispatcher.java:94)
>>
>>
>>
>>
>> It seems CDC don't add  a HeartbeatConnectionProvider  when configure
>> PostgresEventDispatcher:
>>
>> //org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext#configurethis.postgresDispatcher
>>  =
>>                 new PostgresEventDispatcher<>(
>>                         dbzConfig,
>>                         topicSelector,
>>                         schema,
>>                         queue,
>>                         dbzConfig.getTableFilters().dataCollectionFilter(),
>>                         DataChangeEvent::new,
>>                         metadataProvider,
>>                         schemaNameAdjuster);
>>
>>
>> In debezium, when PostgresConnectorTask start, it will  do it
>>
>> //io.debezium.connector.postgresql.PostgresConnectorTask#start  final 
>> PostgresEventDispatcher<TableId> dispatcher = new PostgresEventDispatcher<>(
>>                     connectorConfig,
>>                     topicNamingStrategy,
>>                     schema,
>>                     queue,
>>                     connectorConfig.getTableFilters().dataCollectionFilter(),
>>                     DataChangeEvent::new,
>>                     PostgresChangeRecordEmitter::updateSchema,
>>                     metadataProvider,
>>                     connectorConfig.createHeartbeat(
>>                             topicNamingStrategy,
>>                             schemaNameAdjuster,
>>                             () -> new 
>> PostgresConnection(connectorConfig.getJdbcConfig(), 
>> PostgresConnection.CONNECTION_GENERAL),
>>                             exception -> {
>>                                 String sqlErrorId = exception.getSQLState();
>>                                 switch (sqlErrorId) {
>>                                     case "57P01":
>>                                         // Postgres error admin_shutdown, 
>> see https://www.postgresql.org/docs/12/errcodes-appendix.html                
>>                         throw new DebeziumException("Could not execute 
>> heartbeat action query (Error: " + sqlErrorId + ")", exception);
>>                                     case "57P03":
>>                                         // Postgres error 
>> cannot_connect_now, see 
>> https://www.postgresql.org/docs/12/errcodes-appendix.html                    
>>                     throw new RetriableException("Could not execute 
>> heartbeat action query (Error: " + sqlErrorId + ")", exception);
>>                                     default:
>>                                         break;
>>                                 }
>>                             }),
>>                     schemaNameAdjuster,
>>                     signalProcessor);
>>
>> Thus, I have create a new jira[4] to fix it.
>>
>>
>>
>>  [1]
>> https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/legacy-flink-cdc-sources/postgres-cdc/
>>
>> [2]
>> https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-interval-ms
>>
>> [3]
>> https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-heartbeat-action-query
>>
>> [4] https://issues.apache.org/jira/browse/FLINK-35387
>>
>>
>> Best
>>
>> Hongshun
>>
>> On Thu, May 16, 2024 at 9:03 PM Thomas Peyric <
>> thomas.pey...@asklocala.com> wrote:
>>
>>> Hi Flink Community !
>>>
>>> I am using :
>>> * Flink
>>> * Flink CDC posgtres Connector
>>> * scala + sbt
>>>
>>> versions are :
>>>   * orgApacheKafkaVersion = "3.2.3"
>>>   * flinkVersion = "1.19.0"
>>>   * flinkKafkaVersion = "3.0.2-1.18"
>>>   * flinkConnectorPostgresCdcVersion = "3.0.1"
>>>   * debeziumVersion = "1.9.8.Final"
>>>   * scalaVersion = "2.12.13"
>>>   * javaVersion = "11"
>>>
>>>
>>> the problem
>>> -----------
>>>
>>> I have a problem with the heartbeat interval feature:
>>> * when I am querying PG with `select * from pg_replication_slots;` for
>>> checking if information are updated on each replication slots at defined
>>> interval
>>> * then confirmed_flush_lsn values are never updated
>>> PS: i have other replication slots managed directly with debezium
>>> (without flink) and their confirmed_flush_lsn values are updated correctly
>>> (same pg DB) depending of their own interval
>>>
>>> ```
>>>          slot_name          |  plugin  | slot_type |  datoid   |
>>> database      | temporary | active | active_pid | xmin | catalog_xmin |
>>> restart_lsn  | confirmed_flush_lsn
>>>
>>> ----------------------------+----------+-----------+-----------+-------------------+-----------+--------+------------+------+--------------+--------------+---------------------
>>>  slot_table1                | pgoutput | logical   | 811518778 | the_db
>>>            | f         | t      |      10870 |      |   1630392036 |
>>> 712/697C0DB8 | 712/697C0DF0
>>>  slot_table2                | pgoutput | logical   | 811518778 | the_db
>>>            | f         | t      |      10894 |      |   1630392033 |
>>> 712/697AD0A8 | 712/697AD0E0
>>>  slot_table3                | pgoutput | logical   | 811518778 | the_db
>>>            | f         | t      |      10978 |      |   1630392034 |
>>> 712/697AD0A8 | 712/697AD0A8
>>> ```
>>>
>>>
>>>
>>> My setup
>>> --------
>>>
>>> I have configured 3 distinct DataStreamSource on 3 pg database tables
>>> using this common method :
>>>
>>> ```
>>> private def initEntityDataSource(conf: Config, env:
>>> StreamExecutionEnvironment, entityName: String, columnList: String) = {
>>>
>>>     val dbzProps: Properties = new Properties()
>>>     dbzProps.setProperty("column.include.list", columnList)
>>>   // "public.tableX.column1,public.tableX.column2"
>>>
>>>     val postgresIncrementalSource:
>>> PostgresSourceBuilder.PostgresIncrementalSource[String] =
>>> PostgresSourceBuilder.PostgresIncrementalSource.builder()
>>>       .hostname(conf.getString("pg.hostname"))
>>>       .port(conf.getInt("pg.port"))
>>>       .database(conf.getString("pg.database"))
>>>       .username(conf.getString("pg.username"))
>>>       .password(conf.getString("pg.password"))
>>>       .slotName(conf.getString(s"flink.${entityName}.slot_name"))
>>>           // slot_tableX
>>>       .decodingPluginName("pgoutput")
>>>       .includeSchemaChanges(true)
>>>       .deserializer(new JsonDebeziumDeserializationSchema())
>>>       .closeIdleReaders(true)
>>>       .heartbeatInterval(Duration.ofMillis(10000))
>>>  // <--    // 10 seconds
>>>       .connectTimeout(Duration.ofSeconds(10))
>>>           // 10 Seconds
>>>       .startupOptions(StartupOptions.initial())
>>>       .schemaList("public")
>>>       .tableList("public." +
>>> conf.getString(s"flink.${entityName}.table_name"))   // public.tableX
>>>       .debeziumProperties(dbzProps)
>>> // <--    // dbzProps
>>>       .build()
>>>
>>>     env.fromSource(postgresIncrementalSource,
>>> WatermarkStrategy.noWatermarks[String](), s"pg-projector-${entityName}")
>>>       .setParallelism(1)
>>>
>>>   }
>>> ```
>>>
>>> After that I have converted each DataStreamSource into Table
>>> And I join those 3 Table and convert result into a DataStream[Row]
>>>
>>> On this new DataStream I do a keyBy for processing a custom
>>> KeyedProcessFunction function
>>>
>>> All of this is working fine and do its job
>>>
>>> But heartbeat seems to not refresh values into
>>> pg_replication_slots.confirmed_flush_lsn column
>>>
>>>
>>> PS: I also try this :
>>>
>>> 1) instead of using the .heartbeatInterval() method to set the value of
>>> interval ... i use debezium properties like this
>>>
>>> ```
>>> dbzProps.setProperty("heartbeat.interval.ms", "10000")    // and also
>>> "PT10S"
>>> ```
>>>
>>> it seems there is no effect with this
>>>
>>> 2) it seems that debezium needs to create a kafka topic for managing
>>> heartbeat. In theory, If the topic does not exist it will be automaitcally
>>> created
>>> But my kafka server does not authorize this auto creation ... so i
>>> create this topic mannually with this name :
>>> `__flink-heartbeat.postgres_cdc_source`
>>>
>>> i also add this dbzProps for setting the good topic prefix
>>>
>>> ```
>>> dbzProps.setProperty("topic.heartbeat.prefix", "__flink-heartbeat")
>>> ```
>>>
>>> it seems there is no effect with this too
>>>
>>>
>>>
>>>
>>> So ... Do you have any ideas ?
>>>
>>> Thanks,
>>>
>>> Thomas
>>>
>>> ------------------------------
>>> You received this electronic message as part of a business or employment
>>> relationship with one or several Ask Locala entities. Its content is
>>> strictly confidential and is covered by the obligation of confidentiality
>>> and business secrecy. Any dissemination, copying, printing distribution,
>>> retention or use of the message’s content or any attachments that could be
>>> detrimental to Ask Locala is forbidden, even if it was forwarded by mailing
>>> lists.
>>>
>>> If you are not the intended recipient, please notify the sender of the
>>> error without delay and delete permanently this email and any files from
>>> your system and destroy any printed copies.
>>>
>>
>
> --
>
> [image: Locala] <https://www.asklocala.com/>
> Thomas PEYRIC
> BACKEND ENGINEER
> thomas.pey...@asklocala.com
> Docks de la Joliette
> Atrium 10.8, 4eme étage droite.
> 10 Place de la Joliette, 13002 Marseille - FRANCE.
>
>
> ------------------------------
> You received this electronic message as part of a business or employment
> relationship with one or several Ask Locala entities. Its content is
> strictly confidential and is covered by the obligation of confidentiality
> and business secrecy. Any dissemination, copying, printing distribution,
> retention or use of the message’s content or any attachments that could be
> detrimental to Ask Locala is forbidden, even if it was forwarded by mailing
> lists.
>
> If you are not the intended recipient, please notify the sender of the
> error without delay and delete permanently this email and any files from
> your system and destroy any printed copies.
>

Reply via email to