Re: Flink-SQL returning duplicate rows for some records
Hi Leonard and Martijn, thanks for looking into this. I ran into the issue on Flink 1.14.4 (with the matching flink-sql-connector-kafka based on Scala 2.11), but reproduced the problem today in 1.15.0 (again with the matching flink-sql-connector-kafka). I haven't used older versions than 1.14.4. These following debezium-json messages illustrate the problem; note that they're published without schema and that they're all produced to Kafka with this message key: {"id":1} These are the message values; first for an INSERT: {"before":null,"after":{"id":1,"done":false,"name":"Initial value"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104409527,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:0528:001c","commit_lsn":"0025:0528:001d","event_serial_no":1},"op":"c","ts_ms":1652104413976,"transaction":null} Then an UPDATE on the text field: {"before":{"id":1,"done":false,"name":"Initial value"},"after":{"id":1,"done":false,"name":"Updated #1"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104502837,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:05d8:0002","commit_lsn":"0025:05d8:0003","event_serial_no":2},"op":"u","ts_ms":1652104503260,"transaction":null} Then an UPDATE on a boolean field -- this causes a duplicated row for id=1: {"before":{"id":1,"done":false,"name":""},"after":{"id":1,"done":true,"name":"Updated #1"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104507080,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:05f0:0002","commit_lsn":"0025:05f0:0003","event_serial_no":2},"op":"u","ts_ms":1652104508248,"transaction":null} Another UPDATE on the text field -- this causes an update the of text field in the second instance of the id=1 row: {"before":{"id":1,"done":true,"name":"Updated #1"},"after":{"id":1,"done":true,"name":"Updated #2"},"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104511600,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:0608:0002","commit_lsn":"0025:0608:0003","event_serial_no":2},"op":"u","ts_ms":1652104513257,"transaction":null} And finally a DELETE -- this causes the deletion of the second row with id=1, but not the first: {"before":{"id":1,"done":true,"name":"Updated #2"},"after":null,"source":{"version":"1.9.2.Final","connector":"sqlserver","name":"mssql","ts_ms":1652104514893,"snapshot":"false","db":"todo","sequence":null,"schema":"dbo","table":"todo_list","change_lsn":"0025:0620:0002","commit_lsn":"0025:0620:0005","event_serial_no":1},"op":"d","ts_ms":1652104518749,"transaction":null} (Debezium then produces a tombstone record with the same key `{"id":1}` and value `null`.) For reference, this is the CREATE TABLE statement for the source connector:: CREATE TABLE todo_list ( id BIGINT, done BOOLEAN, name STRING ) WITH ( 'connector'='kafka', 'topic'='mssql.dbo.todo_list', 'properties.bootstrap.servers'='10.88.10.10:9092', 'properties.group.id'='flinksql-todo-list', 'scan.startup.mode'='earliest-offset', 'key.format'='json', 'key.fields'='id', 'value.format'='debezium-json', 'value.debezium-json.schema-include'='false', 'value.fields-include'='EXCEPT_KEY' ); Please let me know if there's anything else I can do to clear this up. Kind regards, Joost Molenaar On Sat, 7 May 2022 at 10:26, Leonard Xu wrote: > > Hi Joost > > Could you share your flink version and the two records in debezium-json > format which produced by two MS SQL UPDATE statement ? > > Best, > Leonard > > > 2022年5月2日 下午9:59,Joost Molenaar 写道: > > > > Hello all, > > > > I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by > > Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason, > > Flink-SQL shows a new row when I update the boolean field, but updates the > > row in place when I update the text field, and I'm not understanding why > > this happens. My ultimate goal is to use Flink-SQL to do a join on records > > that come from both sides of a 1:N relation in the foreign database, to > > expose a more ready to consume JSON object to downstream consumers. > > > > The source table is defined like this in MS-SQL: > > > >CREATE TABLE todo_list ( > >id int IDENTITY NOT NULL, > >done bit NOT NULL DEFAULT 0, > >name varchar(MAX) NOT NULL, > >CONSTRAINT PK_todo_list PRIMARY KEY (id) > >); > > > > This is the configuration I'm sending to Debezium, note that I'm not > > including the > > JSON-schema in both keys and values: > > > >{ > >
Re: Flink-SQL returning duplicate rows for some records
Hi Joost Could you share your flink version and the two records in debezium-json format which produced by two MS SQL UPDATE statement ? Best, Leonard > 2022年5月2日 下午9:59,Joost Molenaar 写道: > > Hello all, > > I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by > Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason, > Flink-SQL shows a new row when I update the boolean field, but updates the > row in place when I update the text field, and I'm not understanding why > this happens. My ultimate goal is to use Flink-SQL to do a join on records > that come from both sides of a 1:N relation in the foreign database, to > expose a more ready to consume JSON object to downstream consumers. > > The source table is defined like this in MS-SQL: > >CREATE TABLE todo_list ( >id int IDENTITY NOT NULL, >done bit NOT NULL DEFAULT 0, >name varchar(MAX) NOT NULL, >CONSTRAINT PK_todo_list PRIMARY KEY (id) >); > > This is the configuration I'm sending to Debezium, note that I'm not > including the > JSON-schema in both keys and values: > >{ >"name": "todo-connector", >"config": { >"connector.class": > "io.debezium.connector.sqlserver.SqlServerConnector", >"tasks.max": "1", >"database.server.name": "mssql", >"database.hostname": "10.88.10.1", >"database.port": "1433", >"database.user": "sa", >"database.password": "...", >"database.dbname": "todo", >"database.history.kafka.bootstrap.servers": "10.88.10.10:9092", >"database.history.kafka.topic": "schema-changes.todo", >"key.converter": "org.apache.kafka.connect.json.JsonConverter", >"key.converter.schemas.enable": false, >"value.converter": "org.apache.kafka.connect.json.JsonConverter", >"value.converter.schemas.enable": false >} >} > > So Debezium is publishing events to Kafka with keys like this: > >{"id":3} > > And values like this (whitespace added for readability), this is updating the > value of the 'name' field: > >{ > "before": { >"id": 3, >"done": false, >"name": "test" > }, > "after": { >"id": 3, >"done": false, >"name": "test2" > }, > "source": { >"version": "1.9.0.Final", >"connector": "sqlserver", >"name": "mssql", >"ts_ms": 1651497653043, >"snapshot": "false", >"db": "todo", >"sequence": null, >"schema": "dbo", >"table": "todo_list", >"change_lsn": "0025:0d58:0002", >"commit_lsn": "0025:0d58:0003", >"event_serial_no": 2 > }, > "op": "u", > "ts_ms": 1651497654127, > "transaction": null >} > > (I verified this using a Python script that follows the relevant Kafka topic.) > > Next, I'm trying to follow this CDC stream in Flink by adding the > Kafka connector > for Flink SQL, defining a source table and starting a job in the Flink-SQL > CLI: > >ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar'; > >CREATE TABLE todo_list ( >k_id BIGINT, >done BOOLEAN, >name STRING >) >WITH ( >'connector'='kafka', >'topic'='mssql.dbo.todo_list', >'properties.bootstrap.servers'='10.88.10.10:9092', >'properties.group.id'='flinksql-todo-list', >'scan.startup.mode'='earliest-offset', >'key.format'='json', >'key.fields-prefix'='k_', >'key.fields'='k_id', >'value.format'='debezium-json', >'value.debezium-json.schema-include'='false', >'value.fields-include'='EXCEPT_KEY' >); > >SELECT * FROM todo_list; > > Now, when I perform a query like this in the MS-SQL database: > >UPDATE todo_list SET name='test2' WHERE id=3; > > Now I see that the Flink-SQL client updates the row with id=3 to have the new > value "test2" for the 'name' field, as I was expecting. However, when I > duplicate the 'done' field to have a different value, Flink-SQL seems to leave > the old row with values (3, False, 'test2') intact, and shows a new row with > values (3, True, 'test2'). > > I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the first > parentheses in the CREATE TABLE statement, but this seems to make no > difference, except when running `DESCRIBE todo_list` in Flink-SQL. > > I have no idea why the boolean field would cause different behavior than the > text field. Am I missing some piece of configuration, are my expectations > wrong? > > > Regards, > Joost Molenaar
Re: Flink-SQL returning duplicate rows for some records
Hi Joost, I'm looping in Leonard and Jark who might be able to help out here. Best regards, Martijn On Mon, 2 May 2022 at 16:01, Joost Molenaar wrote: > Hello all, > > I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by > Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason, > Flink-SQL shows a new row when I update the boolean field, but updates the > row in place when I update the text field, and I'm not understanding why > this happens. My ultimate goal is to use Flink-SQL to do a join on records > that come from both sides of a 1:N relation in the foreign database, to > expose a more ready to consume JSON object to downstream consumers. > > The source table is defined like this in MS-SQL: > > CREATE TABLE todo_list ( > id int IDENTITY NOT NULL, > done bit NOT NULL DEFAULT 0, > name varchar(MAX) NOT NULL, > CONSTRAINT PK_todo_list PRIMARY KEY (id) > ); > > This is the configuration I'm sending to Debezium, note that I'm not > including the > JSON-schema in both keys and values: > > { > "name": "todo-connector", > "config": { > "connector.class": > "io.debezium.connector.sqlserver.SqlServerConnector", > "tasks.max": "1", > "database.server.name": "mssql", > "database.hostname": "10.88.10.1", > "database.port": "1433", > "database.user": "sa", > "database.password": "...", > "database.dbname": "todo", > "database.history.kafka.bootstrap.servers": "10.88.10.10:9092 > ", > "database.history.kafka.topic": "schema-changes.todo", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter": > "org.apache.kafka.connect.json.JsonConverter", > "value.converter.schemas.enable": false > } > } > > So Debezium is publishing events to Kafka with keys like this: > > {"id":3} > > And values like this (whitespace added for readability), this is updating > the > value of the 'name' field: > > { > "before": { > "id": 3, > "done": false, > "name": "test" > }, > "after": { > "id": 3, > "done": false, > "name": "test2" > }, > "source": { > "version": "1.9.0.Final", > "connector": "sqlserver", > "name": "mssql", > "ts_ms": 1651497653043, > "snapshot": "false", > "db": "todo", > "sequence": null, > "schema": "dbo", > "table": "todo_list", > "change_lsn": "0025:0d58:0002", > "commit_lsn": "0025:0d58:0003", > "event_serial_no": 2 > }, > "op": "u", > "ts_ms": 1651497654127, > "transaction": null > } > > (I verified this using a Python script that follows the relevant Kafka > topic.) > > Next, I'm trying to follow this CDC stream in Flink by adding the > Kafka connector > for Flink SQL, defining a source table and starting a job in the Flink-SQL > CLI: > > ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar'; > > CREATE TABLE todo_list ( > k_id BIGINT, > done BOOLEAN, > name STRING > ) > WITH ( > 'connector'='kafka', > 'topic'='mssql.dbo.todo_list', > 'properties.bootstrap.servers'='10.88.10.10:9092', > 'properties.group.id'='flinksql-todo-list', > 'scan.startup.mode'='earliest-offset', > 'key.format'='json', > 'key.fields-prefix'='k_', > 'key.fields'='k_id', > 'value.format'='debezium-json', > 'value.debezium-json.schema-include'='false', > 'value.fields-include'='EXCEPT_KEY' > ); > > SELECT * FROM todo_list; > > Now, when I perform a query like this in the MS-SQL database: > > UPDATE todo_list SET name='test2' WHERE id=3; > > Now I see that the Flink-SQL client updates the row with id=3 to have the > new > value "test2" for the 'name' field, as I was expecting. However, when I > duplicate the 'done' field to have a different value, Flink-SQL seems to > leave > the old row with values (3, False, 'test2') intact, and shows a new row > with > values (3, True, 'test2'). > > I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the > first > parentheses in the CREATE TABLE statement, but this seems to make no > difference, except when running `DESCRIBE todo_list` in Flink-SQL. > > I have no idea why the boolean field would cause different behavior than > the > text field. Am I missing some piece of configuration, are my expectations > wrong? > > > Regards, > Joost Molenaar >
Flink-SQL returning duplicate rows for some records
Hello all, I'm trying to use Flink-SQL to monitor a Kafka topic that's populated by Debezium, which is in turn monitoring a MS-SQL CDC table. For some reason, Flink-SQL shows a new row when I update the boolean field, but updates the row in place when I update the text field, and I'm not understanding why this happens. My ultimate goal is to use Flink-SQL to do a join on records that come from both sides of a 1:N relation in the foreign database, to expose a more ready to consume JSON object to downstream consumers. The source table is defined like this in MS-SQL: CREATE TABLE todo_list ( id int IDENTITY NOT NULL, done bit NOT NULL DEFAULT 0, name varchar(MAX) NOT NULL, CONSTRAINT PK_todo_list PRIMARY KEY (id) ); This is the configuration I'm sending to Debezium, note that I'm not including the JSON-schema in both keys and values: { "name": "todo-connector", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max": "1", "database.server.name": "mssql", "database.hostname": "10.88.10.1", "database.port": "1433", "database.user": "sa", "database.password": "...", "database.dbname": "todo", "database.history.kafka.bootstrap.servers": "10.88.10.10:9092", "database.history.kafka.topic": "schema-changes.todo", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": false } } So Debezium is publishing events to Kafka with keys like this: {"id":3} And values like this (whitespace added for readability), this is updating the value of the 'name' field: { "before": { "id": 3, "done": false, "name": "test" }, "after": { "id": 3, "done": false, "name": "test2" }, "source": { "version": "1.9.0.Final", "connector": "sqlserver", "name": "mssql", "ts_ms": 1651497653043, "snapshot": "false", "db": "todo", "sequence": null, "schema": "dbo", "table": "todo_list", "change_lsn": "0025:0d58:0002", "commit_lsn": "0025:0d58:0003", "event_serial_no": 2 }, "op": "u", "ts_ms": 1651497654127, "transaction": null } (I verified this using a Python script that follows the relevant Kafka topic.) Next, I'm trying to follow this CDC stream in Flink by adding the Kafka connector for Flink SQL, defining a source table and starting a job in the Flink-SQL CLI: ADD JAR '/opt/flink/opt/flink-sql-connector-kafka_2.11-1.14.4.jar'; CREATE TABLE todo_list ( k_id BIGINT, done BOOLEAN, name STRING ) WITH ( 'connector'='kafka', 'topic'='mssql.dbo.todo_list', 'properties.bootstrap.servers'='10.88.10.10:9092', 'properties.group.id'='flinksql-todo-list', 'scan.startup.mode'='earliest-offset', 'key.format'='json', 'key.fields-prefix'='k_', 'key.fields'='k_id', 'value.format'='debezium-json', 'value.debezium-json.schema-include'='false', 'value.fields-include'='EXCEPT_KEY' ); SELECT * FROM todo_list; Now, when I perform a query like this in the MS-SQL database: UPDATE todo_list SET name='test2' WHERE id=3; Now I see that the Flink-SQL client updates the row with id=3 to have the new value "test2" for the 'name' field, as I was expecting. However, when I duplicate the 'done' field to have a different value, Flink-SQL seems to leave the old row with values (3, False, 'test2') intact, and shows a new row with values (3, True, 'test2'). I tried to append a `PRIMARY KEY (k_id) NOT ENFORCED` line between the first parentheses in the CREATE TABLE statement, but this seems to make no difference, except when running `DESCRIBE todo_list` in Flink-SQL. I have no idea why the boolean field would cause different behavior than the text field. Am I missing some piece of configuration, are my expectations wrong? Regards, Joost Molenaar