Hello,

I am trying to get Fink CDC to connect to Oracle 21c XE for testing but
never successfully getting any database entries from Oracle.

I am using the Oracle 21.3 XE container image from their official registry.
By default it has CDB+PDB architecture.

CDB name is CDB$ROOT
PDB name is XEPDB1

Flink 1.20.2 tgz package with oracle-cdc 3.4 and ojdbc17.jar downloaded
from official website.

Here are my steps:

-- Step 1: Setup XE to enable archive logs
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest =
'/opt/oracle/oradata/recovery_area' scope=spfile;
-- mkdir /opt/oracle/oradata/recovery_area
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;
ALTER DATABASE ARCHIVELOG;
ALTER DATABASE OPEN;

archive log list;

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
SELECT LOG_MODE FROM V$DATABASE;
SELECT supplemental_log_data_min FROM v$database;
SELECT supplemental_log_data_all FROM v$database;

-- Step 2: Create tablespaces

CREATE TABLESPACE logminer_tbs DATAFILE
'/opt/oracle/oradata/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE
UNLIMITED;

alter session set container = XEPDB1;
SHOW CON_NAME;

CREATE TABLESPACE logminer_tbs DATAFILE
'/opt/oracle/oradata/XE/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON
MAXSIZE UNLIMITED;

alter session set container = CDB$ROOT;
SHOW CON_NAME;

-- Step 3: create flink user and grant permissions
CREATE USER c##flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE
logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;

GRANT CREATE SESSION TO c##flinkuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##flinkuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##flinkuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##flinkuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##flinkuser CONTAINER=ALL;
GRANT LOGMINING TO c##flinkuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##flinkuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##flinkuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$TRANSACTION TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$MYSTAT TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$STATNAME TO c##flinkuser CONTAINER=ALL;

-- Step 4: Login with c##flinkuser and create a test table under XEPDB1

alter session set container = XEPDB1;
SHOW CON_NAME;

CREATE TABLE user_table (
    id NUMBER PRIMARY KEY,
    name VARCHAR2(100),
    age NUMBER
);

ALTER TABLE USER_TABLE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

INSERT INTO user_table (id, name, age) VALUES (1, 'Alice', 30);
INSERT INTO user_table (id, name, age) VALUES (2, 'Bob', 25);
INSERT INTO user_table (id, name, age) VALUES (3, 'Charlie', 23);
COMMIT;

-- Step 5: In flink-sql, connect to the oracle db
CREATE TABLE oracle_source (
  id INT,
  name STRING,
  age INT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'oracle-cdc',
  'hostname' = '127.0.0.1',
  'port' = '1521',
  'username' = 'c##flinkuser',
  'password' = 'flinkpw',
  'database-name' = 'XE',
  'schema-name' = 'C##FLINKUSER',
  'table-name' = 'USER_TABLE',
  'debezium.database.pdb.name' = 'XEPDB1',
  'debezium.database.tablename.case.insensitive' = 'false',
  'debezium.log.mining.strategy' = 'online_catalog'
);

-- Step 6: See we can select data from Oracle in flink-sql

select * from oracle_source;

There is no error, just empty. Even if I perform an insert statement on
Oracle, there is no data from oracle_source observed.

I am not sure what went wrong, would anyone provide some suggestions?

Steven.

Reply via email to