[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324199#comment-17324199 ] Maciej Bryński commented on FLINK-22141: [~ym] No problem at all. We have this code integrated with 1.12 and want to go to production soon. So we did a lot of tests. > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: pull-request-available, release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17324166#comment-17324166 ] Yuan Mei commented on FLINK-22141: -- [~maver1ck] Thanks so much for reporting this. > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: pull-request-available, release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322770#comment-17322770 ] Maciej Bryński commented on FLINK-22141: Hi, I added a bug from testing this in our environment https://issues.apache.org/jira/browse/FLINK-22311 > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: pull-request-available, release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322475#comment-17322475 ] Roman Khachatryan commented on FLINK-22141: --- You are right, postgres has the same limitation (it is in JDBC driver rather than in DB). I changed FLINK-22239 type to bug and fixVersion to 1.13 for now. I've also published a draft PR to address the limitation. > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: pull-request-available, release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322252#comment-17322252 ] Yuan Mei commented on FLINK-22141: -- So I think Postgres has the same problem as MySQL, which does not allow multiple xa within one connection. cc [~roman_khachatryan] > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: pull-request-available, release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322248#comment-17322248 ] Yuan Mei commented on FLINK-22141: -- Please check PGXAConnection source code here: https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/main/java/org/postgresql/xa/PGXAConnection.java {code:java} private void commitPrepared(Xid xid) throws XAException { try { // Check preconditions. The connection mustn't be used for another // other XA or local transaction, or the COMMIT PREPARED command // would mess it up. if (state != State.IDLE || conn.getTransactionState() != TransactionState.IDLE) { throw new PGXAException( GT.tr("Not implemented: 2nd phase commit must be issued using an idle connection. commit xid={0}, currentXid={1}, state={2}, transactionState={3}", xid, currentXid, state, conn.getTransactionState()), XAException.XAER_RMERR); } {code} > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: pull-request-available, release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322247#comment-17322247 ] Yuan Mei commented on FLINK-22141: -- manual test case: https://github.com/apache/flink/pull/15627 connecting to postgres sink parallel: 12 Issues found: 1. We should at least set max_prepared_transactions >= sink parallel * 2 (with max_prepared_transactions = 20, I was getting exceed max_prepared_transactions error) 2. with max_prepared_transactions = 50 {code:java} 2021-04-15 17:50:29,180 WARN org.apache.flink.runtime.taskmanager.Task [] - Sink: Unnamed (2/12)#0 (e6a72f61a8b315d1a7a7f5ff2e160f69) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: failed to commit 1 transactions out of 1 at org.apache.flink.connector.jdbc.xa.XaGroupOps$GroupXaOperationResult.wrapFailure(XaGroupOps.java:66) at org.apache.flink.connector.jdbc.xa.XaGroupOps$GroupXaOperationResult.lambda$throwIfAnyFailed$0(XaGroupOps.java:88) at java.base/java.util.Optional.map(Optional.java:265) at org.apache.flink.connector.jdbc.xa.XaGroupOps$GroupXaOperationResult.throwIfAnyFailed(XaGroupOps.java:86) at org.apache.flink.connector.jdbc.xa.XaGroupOpsImpl.commit(XaGroupOpsImpl.java:66) at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.commitUpToCheckpoint(JdbcXaSinkFunction.java:313) at org.apache.flink.connector.jdbc.xa.JdbcXaSinkFunction.notifyCheckpointComplete(JdbcXaSinkFunction.java:245) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99) at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:330) at org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1137) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:1102) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:1125) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:657) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flink.util.FlinkRuntimeException: unable to commit XA transaction, xid: 201:4315602f63ad3306a71b48933ff73a800100:400443a7, error -3: resource manager error has occurred. [Not implemented: 2nd phase commit must be issued using an idle connection. commit xid=201:4315602f63ad3306a71b48933ff73a800100:400443a7, currentXid=201:4315602f63ad3306a71b48933ff73a8001000200:400443a7, state=ACTIVE, transactionState=IDLE] at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.wrapException(XaFacadeImpl.java:356) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnableRecoverByWarn$1(XaFacadeImpl.java:310) at java.base/java.util.Optional.orElseThrow(Optional.java:408) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnableRecoverByWarn$2(XaFacadeImpl.java:308) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl$Command.lambda$fromRunnable$4(XaFacadeImpl.java:327) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.execute(XaFacadeImpl.java:267) at org.apache.flink.connector.jdbc.xa.XaFacadeImpl.commit(XaFacadeImpl.java:195) at org.apache.flink.connector.jdbc.xa.XaGroupOpsImpl.commit(XaGroupOpsImpl.java:57) ... 18 more {code} > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components:
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322032#comment-17322032 ] Yuan Mei commented on FLINK-22141: -- Cool, after testing, I can pick up some of them! > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322014#comment-17322014 ] Roman Khachatryan commented on FLINK-22141: --- Thanks a lot [~ym], I agree with all the points you raised and created tickets to address them: FLINK-22288 (api), FLINK-22289 (docs). > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17322006#comment-17322006 ] Yuan Mei commented on FLINK-22141: -- [~roman_khachatryan] > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321934#comment-17321934 ] Yuan Mei commented on FLINK-22141: -- Couple of things found related to documentation when writing test cases following https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/jdbc/ 1. Within the section of "JDBC Connector", I think we need to provide explicit instructions of adding JDBC driver, similar to here in "JDBC SQL Connector" https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/ instead of simply: To use it, add the following dependency to your project (along with your JDBC driver) 2. For JDBC.sink I can set up a pipeline running in cluster connecting to postgres and verify results with JDBC.sink following the instruction, (so this part looks fine to me) 3. For JDBC.exactlyOnceSink, couple of suggestions: 3.1 Change "getDbMetadata().getUrl()" and "getDbMetadata().getDriverClass()" to something more friendly, because I can not find no where in the doc what getDbMetadata() is; We can change to something similar to the follwing: {code:java} .withUrl("jdbc:postgresql://localhost:5432/postgres") .withDriverName("org.postgresql.Driver") {code} 3.2 `JDBC.exactlyOnceSink` has a input arg `JdbcConnectionOptions connectionOptions` never used, so I guess connection info is provided by XADataSource. So do we consider remove `JdbcConnectionOptions connectionOptions`, it is not used and my confuse users? 3.3 Example of how to create XADataSource for supported dbs > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Assignee: Yuan Mei >Priority: Blocker > Labels: release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-22141) Manually test exactly-once JDBC sink
[ https://issues.apache.org/jira/browse/FLINK-22141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17319900#comment-17319900 ] Yuan Mei commented on FLINK-22141: -- TODO List: 1. Document limitation for support mysql xa transactions. refer to [FLINK-22239] Improve support for JdbcXaSinkFunction > Manually test exactly-once JDBC sink > > > Key: FLINK-22141 > URL: https://issues.apache.org/jira/browse/FLINK-22141 > Project: Flink > Issue Type: Test > Components: Connectors / JDBC >Reporter: Roman Khachatryan >Priority: Blocker > Labels: release-testing > Fix For: 1.13.0 > > > In FLINK-15578, an API and its implementation were added to JDBC connector to > support exactly-once semantics for sinks. The implementation uses JDBC XA > transactions. > The scope of this task is to make sure: > # The feature is well-documented > # The API is reasonably easy to use > # The implementation works as expected > ## normal case: database is updated on checkpointing > ## failure and recovery case: no duplicates inserted, no records skipped > ## several DBs: postgressql, mssql, oracle (mysql has a known issue: > FLINK-21743) > ## concurrent checkpoints > 1, DoP > 1 > # Logging is meaningful -- This message was sent by Atlassian Jira (v8.3.4#803005)