Re: How to register custormize serializer for flink kafka format type

2021-07-13 Thread Piotr Nowojski
Hi,

It's mentioned in the docs [1], but unfortunately this is not very well
documented in 1.10. In short you have to provide a custom implementation of
a `DeserializationSchemaFactory`. Please look at the built-in factories for
examples of how it can be done.

In newer versions it's both easier and better documented. For example in
1.13 please take a look at `DeserializationFormatFactory` and [2]

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sourcessinks/#factories

czw., 8 lip 2021 o 14:21 Chenzhiyuan(HR) 
napisaƂ(a):

> I create table as below, and the data is from kafka.
>
> I want to deserialize the json message to Pojo object.
>
> But the message format is not avro or simple json.
>
> *So I need to know how to register custormized serializer and use it for
> the 'format.type' property.*
>
> By the way, my flink version is 1.10.0.
>
> CREATE TABLE MyUserTable(
>
> uuid VARCHAR,
>
> orgId VARCHAR
>
> ) with (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = '0.11',
>
> 'connector.topic' = 'topic_name',
>
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> 'connector.properties.group.id' = 'testGroup',
>
> 'format.type' = 'cutormizeSerializer'
>
> )
>
> The kafka message body sample, each columnName is the key for Pojo object,
> and rawData is value:
>
> {
>
>"beforeData": [],
>
> "byteSize": 272,
>
> "columnNumber": 32,
>
> "data": [{
>
> "byteSize": 8,
>
> "columnName": "APPLY_PERSON_ID",
>
> "rawData": 10017,
>
> "type": "LONG"
>
> }, {
>
> "byteSize": 12,
>
> "columnName": "UPDATE_SALARY",
>
> "rawData": "11000.00",
>
> "type": "DOUBLE"
>
> }, {
>
> "byteSize": 11,
>
> "columnName": "UP_AMOUNT",
>
> "rawData": "1000.00",
>
> "type": "DOUBLE"
>
> }, {
>
> "byteSize": 3,
>
> "columnName": "CURRENCY",
>
> "rawData": "CNY",
>
> "type": "STRING"
>
> }, {
>
> "byteSize": 32,
>
> "columnName": "EXCHANGE_RATE",
>
> "rawData": "1.00",
>
> "type": "DOUBLE"
>
> },  {
>
> "byteSize": 11,
>
> "columnName": "DEDUCTED_ACCOUNT",
>
> "rawData": "1000.00",
>
> "type": "DOUBLE"
>
> }, {
>
> "byteSize": 1,
>
> "columnName": "ENTER_AT_PROCESS",
>
> "rawData": "Y",
>
> "type": "STRING"
>
> }],
>
> "dataCount": 0,
>
> "dataMetaData": {
>
> "connector": "mysql",
>
> "pos": 1000368076,
>
> "row": 0,
>
> "ts_ms": 1625565737000,
>
> "snapshot": "false",
>
> "db": "testdb",
>
> "table": "flow_person_t"
>
> },
>
> "key": "APPLY_PERSON_ID",
>
> "memorySize": 1120,
>
> "operation": "insert",
>
> "rowIndex": -1,
>
> "timestamp": "1970-01-01 00:00:00"
>
> }
>
> The Pojo object as below:
>
> import lombok.Data;
>
>
>
> @Data
>
> public class HrSalaryPersonVO {
>
> private String uuid;
>
> private String orgId;
>
> private String unitId;
>
> private String effectiveDate;
>
>
>
> private int adjustPersonCount;
>
>
>
> private Double adjustAmount;
>
>
>
> private Double beforeSalaryAmount;
>
> private Double adjustRate;
>
>
>
> private String data0prateType;
>
>
>
> private String status;
>
> }
>
>
>


How to register custormize serializer for flink kafka format type

2021-07-08 Thread Chenzhiyuan(HR)
I create table as below, and the data is from kafka.
I want to deserialize the json message to Pojo object.
But the message format is not avro or simple json.
So I need to know how to register custormized serializer and use it for the 
'format.type' property.
By the way, my flink version is 1.10.0.
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = 'cutormizeSerializer'
)
The kafka message body sample, each columnName is the key for Pojo object, and 
rawData is value:
{
   "beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}
The Pojo object as below:
import lombok.Data;

@Data
public class HrSalaryPersonVO {
private String uuid;
private String orgId;
private String unitId;
private String effectiveDate;

private int adjustPersonCount;

private Double adjustAmount;

private Double beforeSalaryAmount;
private Double adjustRate;

private String data0prateType;

private String status;
}