[jira] [Commented] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

2023-04-16 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712857#comment-17712857
 ] 

Jark Wu commented on FLINK-31777:
-

I closed the issue because this is a by-design behavior. Feel free to continue 
to discuss if you have further problems. 

> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> 
>
> Key: FLINK-31777
> URL: https://issues.apache.org/jira/browse/FLINK-31777
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.0
> Environment: Flink: 1.16.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>  
>Reporter: Alvin Ge
>Priority: Major
>
> I use debezium send data to kafka with confluent avro format,  when I use 
> 'upsert-kafka' connector, all values are null (primary key has value), but in 
> 'kafka' connector all values are well.
> My upsert-kafka table like this:
> {code:java}
> // code placeholder
> create table TEA02
> (
>     SUB_SYSTEM_ENAME varchar(255),
>     REC_CREATOR      varchar(255),
>     REC_CREATE_TIME  varchar(255),
>     REC_REVISOR      varchar(255),
>     REC_REVISE_TIME  varchar(255),
>     ARCHIVE_FLAG     varchar(255),
>     SUB_SYSTEM_CNAME varchar(255),
>     SUB_SYSTEM_FNAME varchar(255),
>     SUB_SYSTEM_LEVEL varchar(255),
>     primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'dev.oracle.JNMMM1.TEA02',
>  'properties.bootstrap.servers' = 
> '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
>  'properties.group.id' = 'TEA02',
>  'key.format' = 'avro-confluent',
>  'key.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
> query result:
> ||SUB_SYSTEM_ENAME(this columns is 
> pk)||REC_CREATOR||REC_CREATE_TIME||...||
> |CJ|null|null|null|
> Specified subject still not working.
> {code:java}
> // code placeholder
>  'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key',
>  'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value' {code}
> BTW: All debezium events are READ operation.
> The confluent schemas are here:
> {code:java}
> // code placeholder
> [{
>     "subject": "dev.oracle-key",
>     "version": 1,
>     "id": 1,
>     "schema": 
> "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeKey\"}"
> }, {
>     "subject": "dev.oracle-value",
>     "version": 1,
>     "id": 2,
>     "schema": 
> 

[jira] [Commented] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

2023-04-16 Thread Alvin Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17712853#comment-17712853
 ] 

Alvin Ge commented on FLINK-31777:
--

[~jark] Thank you!

> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> 
>
> Key: FLINK-31777
> URL: https://issues.apache.org/jira/browse/FLINK-31777
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.0
> Environment: Flink: 1.16.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>  
>Reporter: Alvin Ge
>Priority: Major
>
> I use debezium send data to kafka with confluent avro format,  when I use 
> 'upsert-kafka' connector, all values are null (primary key has value), but in 
> 'kafka' connector all values are well.
> My upsert-kafka table like this:
> {code:java}
> // code placeholder
> create table TEA02
> (
>     SUB_SYSTEM_ENAME varchar(255),
>     REC_CREATOR      varchar(255),
>     REC_CREATE_TIME  varchar(255),
>     REC_REVISOR      varchar(255),
>     REC_REVISE_TIME  varchar(255),
>     ARCHIVE_FLAG     varchar(255),
>     SUB_SYSTEM_CNAME varchar(255),
>     SUB_SYSTEM_FNAME varchar(255),
>     SUB_SYSTEM_LEVEL varchar(255),
>     primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'dev.oracle.JNMMM1.TEA02',
>  'properties.bootstrap.servers' = 
> '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
>  'properties.group.id' = 'TEA02',
>  'key.format' = 'avro-confluent',
>  'key.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
> query result:
> ||SUB_SYSTEM_ENAME(this columns is 
> pk)||REC_CREATOR||REC_CREATE_TIME||...||
> |CJ|null|null|null|
> Specified subject still not working.
> {code:java}
> // code placeholder
>  'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key',
>  'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value' {code}
> BTW: All debezium events are READ operation.
> The confluent schemas are here:
> {code:java}
> // code placeholder
> [{
>     "subject": "dev.oracle-key",
>     "version": 1,
>     "id": 1,
>     "schema": 
> "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeKey\"}"
> }, {
>     "subject": "dev.oracle-value",
>     "version": 1,
>     "id": 2,
>     "schema": 
> 

[jira] [Commented] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

2023-04-13 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711820#comment-17711820
 ] 

Jark Wu commented on FLINK-31777:
-

Hi [~ge.bugman], "upsert-kafka" doesn't support JSON or AVRO in debezium style 
(as you can see, there is a "not insert only" exception). "upsert-kafka" 
deserializes messages using the given format, say "json" or "avro-confluent" in 
your case. For example, REC_CREATOR can be extracted if there is a such a field 
in the first level in the JSON, e.g., 
{code}
{"SUB_SYSTEM_ENAME": "CJ", "REC_CREATOR": "Jark"}
{code}
But the actual "REC_CREATOR" field is nested in "before" or "after" field. 
That's why it's failed to extract the field and get null. 


In your case, it is recommended to use "kafka" connector + 
"debezium-avro-conflluent" format. 

> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> 
>
> Key: FLINK-31777
> URL: https://issues.apache.org/jira/browse/FLINK-31777
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Affects Versions: 1.16.0
> Environment: Flink: 1.16.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>  
>Reporter: Alvin Ge
>Priority: Major
>
> I use debezium send data to kafka with confluent avro format,  when I use 
> 'upsert-kafka' connector, all values are null (primary key has value), but in 
> 'kafka' connector all values are well.
> My upsert-kafka table like this:
> {code:java}
> // code placeholder
> create table TEA02
> (
>     SUB_SYSTEM_ENAME varchar(255),
>     REC_CREATOR      varchar(255),
>     REC_CREATE_TIME  varchar(255),
>     REC_REVISOR      varchar(255),
>     REC_REVISE_TIME  varchar(255),
>     ARCHIVE_FLAG     varchar(255),
>     SUB_SYSTEM_CNAME varchar(255),
>     SUB_SYSTEM_FNAME varchar(255),
>     SUB_SYSTEM_LEVEL varchar(255),
>     primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'dev.oracle.JNMMM1.TEA02',
>  'properties.bootstrap.servers' = 
> '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
>  'properties.group.id' = 'TEA02',
>  'key.format' = 'avro-confluent',
>  'key.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
> query result:
> ||SUB_SYSTEM_ENAME(this columns is 
> pk)||REC_CREATOR||REC_CREATE_TIME||...||
> |CJ|null|null|null|
> Specified subject still not working.
> {code:java}
> // code placeholder
>  'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key',
>  'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value' {code}
> BTW: All debezium events are READ operation.
> The confluent schemas are here:
> {code:java}
> // code placeholder
> [{
>     "subject": "dev.oracle-key",
>     "version": 1,
>     "id": 1,
>     "schema": 
> "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeKey\"}"
> }, {
>     "subject": "dev.oracle-value",
>     "version": 1,
>     "id": 2,
>     "schema": 
> 

[jira] [Commented] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

2023-04-12 Thread Alvin Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711374#comment-17711374
 ] 

Alvin Ge commented on FLINK-31777:
--

[~martijnvisser] Hello, do you know what json format can use upsert-kafka? I 
tried debezium style json and `value.format` specifie `debezium-json`, I got an 
error debezium-json is not insert only, than I specifie `json` values are still 
null, I don't know how to do this...

Sorry this is my first time to use upsert-kafka.

> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> 
>
> Key: FLINK-31777
> URL: https://issues.apache.org/jira/browse/FLINK-31777
> Project: Flink
>  Issue Type: Improvement
>  Components: kafka
>Affects Versions: 1.16.0
> Environment: Flink: 1.16.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>  
>Reporter: Alvin Ge
>Priority: Major
>
> I use debezium send data to kafka with confluent avro format,  when I use 
> 'upsert-kafka' connector, all values are null (primary key has value), but in 
> 'kafka' connector all values are well.
> My upsert-kafka table like this:
> {code:java}
> // code placeholder
> create table TEA02
> (
>     SUB_SYSTEM_ENAME varchar(255),
>     REC_CREATOR      varchar(255),
>     REC_CREATE_TIME  varchar(255),
>     REC_REVISOR      varchar(255),
>     REC_REVISE_TIME  varchar(255),
>     ARCHIVE_FLAG     varchar(255),
>     SUB_SYSTEM_CNAME varchar(255),
>     SUB_SYSTEM_FNAME varchar(255),
>     SUB_SYSTEM_LEVEL varchar(255),
>     primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'dev.oracle.JNMMM1.TEA02',
>  'properties.bootstrap.servers' = 
> '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
>  'properties.group.id' = 'TEA02',
>  'key.format' = 'avro-confluent',
>  'key.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
> query result:
> ||SUB_SYSTEM_ENAME(this columns is 
> pk)||REC_CREATOR||REC_CREATE_TIME||...||
> |CJ|null|null|null|
> Specified subject still not working.
> {code:java}
> // code placeholder
>  'key.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-key',
>  'value.avro-confluent.subject' = 'dev.oracle.JNMMM1.TEA02-value' {code}
> BTW: All debezium events are READ operation.
> The confluent schemas are here:
> {code:java}
> // code placeholder
> [{
>     "subject": "dev.oracle-key",
>     "version": 1,
>     "id": 1,
>     "schema": 
> "{\"type\":\"record\",\"name\":\"SchemaChangeKey\",\"namespace\":\"io.debezium.connector.oracle\",\"fields\":[{\"name\":\"databaseName\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"io.debezium.connector.oracle.SchemaChangeKey\"}"
> }, {
>     "subject": "dev.oracle-value",
>     "version": 1,
>     "id": 2,
>     "schema": 
> 

[jira] [Commented] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

2023-04-12 Thread Alvin Ge (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711351#comment-17711351
 ] 

Alvin Ge commented on FLINK-31777:
--

[~martijnvisser] Sorry, that's 1.16 my fault...

> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> 
>
> Key: FLINK-31777
> URL: https://issues.apache.org/jira/browse/FLINK-31777
> Project: Flink
>  Issue Type: Improvement
>  Components: kafka
>Affects Versions: 1.16.0
> Environment: Flink: 1.6.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>  
>Reporter: Alvin Ge
>Priority: Major
>
> I use debezium send data to kafka with confluent avro format,  when I use 
> 'upsert-kafka' connector, all values are null (primary key has value), but in 
> 'kafka' connector all values is well.
> My upsert-kafka table like this:
> {code:java}
> // code placeholder
> create table TEA02
> (
>     SUB_SYSTEM_ENAME varchar(255),
>     REC_CREATOR      varchar(255),
>     REC_CREATE_TIME  varchar(255),
>     REC_REVISOR      varchar(255),
>     REC_REVISE_TIME  varchar(255),
>     ARCHIVE_FLAG     varchar(255),
>     SUB_SYSTEM_CNAME varchar(255),
>     SUB_SYSTEM_FNAME varchar(255),
>     SUB_SYSTEM_LEVEL varchar(255),
>     primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'dev.oracle.JNMMM1.TEA02',
>  'properties.bootstrap.servers' = 
> '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
>  'properties.group.id' = 'TEA02',
>  'key.format' = 'avro-confluent',
>  'key.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
> query result:
> ||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||...||
> |CJ|null|null|null|
> BTW: All debezium event are READ operation.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31777) Upsert Kafka use Avro Confluent, key is ok, but all values are null.

2023-04-12 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31777?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711350#comment-17711350
 ] 

Martijn Visser commented on FLINK-31777:


[~ge.bugman] Are you sure you've used Flink 1.6.0 for testing, or was this 
Flink 1.16.0 ?

> Upsert Kafka use Avro Confluent, key is ok, but all values are null.
> 
>
> Key: FLINK-31777
> URL: https://issues.apache.org/jira/browse/FLINK-31777
> Project: Flink
>  Issue Type: Improvement
>  Components: kafka
>Affects Versions: 1.16.0
> Environment: Flink: 1.6.0
> Confluent version: 7.3.3
> Debezium version: 2.1.0/2.0.0
>  
>Reporter: Alvin Ge
>Priority: Major
>
> I use debezium send data to kafka with confluent avro format,  when I use 
> 'upsert-kafka' connector, all values are null (primary key has value), but in 
> 'kafka' connector all values is well.
> My upsert-kafka table like this:
> {code:java}
> // code placeholder
> create table TEA02
> (
>     SUB_SYSTEM_ENAME varchar(255),
>     REC_CREATOR      varchar(255),
>     REC_CREATE_TIME  varchar(255),
>     REC_REVISOR      varchar(255),
>     REC_REVISE_TIME  varchar(255),
>     ARCHIVE_FLAG     varchar(255),
>     SUB_SYSTEM_CNAME varchar(255),
>     SUB_SYSTEM_FNAME varchar(255),
>     SUB_SYSTEM_LEVEL varchar(255),
>     primary key (SUB_SYSTEM_ENAME) not enforced
> ) WITH (
>  'connector' = 'upsert-kafka',
>  'topic' = 'dev.oracle.JNMMM1.TEA02',
>  'properties.bootstrap.servers' = 
> '10.0.170.213:9092,10.0.170.214:9092,10.0.170.215:9092',
>  'properties.group.id' = 'TEA02',
>  'key.format' = 'avro-confluent',
>  'key.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.format' = 'avro-confluent',
>  'value.avro-confluent.url' = 'http://10.0.170.213:8081',
>  'value.fields-include' = 'EXCEPT_KEY'
> ); {code}
> query result:
> ||SUB_SYSTEM_ENAME||REC_CREATOR||REC_CREATE_TIME||...||
> |CJ|null|null|null|
> BTW: All debezium event are READ operation.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)