Hi all...
Below is the jars included in my flink 2.0 build. and then the catalog
create and the query... If I drop flink to 1.20.2 and associated jars then
all works, but for 2.0 I'm a bit stuck...
*Dockerfile*
RUN echo "--> Install JARs: Flink's S3 plugin" && \
mkdir ./plugins/s3-fs-hadoop && \
mv ./opt/flink-s3-fs-hadoop-2.0.0.jar ./plugins/s3-fs-hadoop/
RUN echo "--> Install Flink JARs: Generic"
COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar
COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
COPY stage/flink-sql-connector-postgres-cdc-3.4.0.jar
/opt/flink/lib/flink-sql-connector-postgres-cdc-3.4.0.jar
COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar
# Required by Debezium
COPY stage/flink-sql-json-2.0.0.jar /opt/flink/lib/flink-sql-json-2.0.0.jar
COPY stage/flink-json-2.0.0.jar /opt/flink/lib/stage/flink-json-2.0.0.jar
COPY stage/flink-sql-parquet-2.0.0.jar
/opt/flink/lib/flink-sql-parquet-2.0.02.jar
# Paimon
COPY stage/paimon-flink-2.0-1.2.0.jar
/opt/flink/lib/paimon-flink-2.0-1.2.0.jar
RUN chown -R flink:flink /opt/flink
create_cat.sql
-- Inbound from PostgreSQL via CDC
CREATE CATALOG postgres_catalog WITH
('type'='generic_in_memory');
Create_inboude_cdc.sql
CREATE OR REPLACE TABLE postgres_catalog.inbound.children (
id BIGINT -- This is a postgresql Serial generated field
,nationalid VARCHAR(14) -- NOT NULL
,data STRING -- JSONB Payload
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (nationalid) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432' -- NOTE: this is the port of the db on the container, not
the external docker exported port via a port mapping.
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'children'
,'slot.name' = 'children'
,'scan.incremental.snapshot.enabled' = 'false' -- experimental feature:
incremental snapshot (default of
,'scan.startup.mode' = 'initial' --
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position
,'decoding.plugin.name' = 'pgoutput'
);
*Flink SQL> select * from postgres_catalog.inbound.children;*
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException:
org.apache.flink.streaming.api.functions.source.SourceFunction
Flink SQL>
G
--
You have the obligation to inform one honestly of the risk, and as a person
you are committed to educate yourself to the total risk in any activity!
Once informed & totally aware of the risk,
every fool has the right to kill or injure themselves as they see fit!