Hi. elaloya.

If you want to log some information about the kafka records, you can add
some logs in KafkaRecordEmitter.
If you want to know the information about the deserialized value, you
should add logs in the avro format.

Best,
Hang

elakiya udhayanan <laks....@gmail.com> 于2023年7月19日周三 19:44写道:

> Hi Team,
>
> I am using the upsert-kafka table API connector of Apache Flink to consume
> events from a kafka topic, I want to log the kafka payloads that are
> consumed. Is there a way to log it?
>
> My code looks as below:
>
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inStreamingMode().build();
>         TableEnvironment tEnv = TableEnvironment.create(settings);
> String statement = "CREATE TABLE Employee (\r\n" +
> "  employee  ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (employeeId) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
> ")";
> tEnv.executeSql(statement);
>
> I added log4j.properties to enable log but it did not work. Any help is
> appreciated.
>

Reply via email to