Hi Xuyang,

Thanks again for giving me some insights on how to use the Datastream API
for my use case, I will explore it and experiment with it.

I wanted to use the value inside the row datatype as a primary key because,
I might get multiple records for the same id and when I try to make a join
with another similar table for correlation purpose it produces numerous
results which could be avoided if I can make the id inside the employee
object as primary key ,also I cannot modify the avro schema since it is a
standard used across multiple consumers.

Thanks,
Elakiya

On Wed, Nov 1, 2023 at 3:00 PM Xuyang <xyzhong...@163.com> wrote:

> Hi, Elakiya,
> I think you can get what you need here[1] with many examples briging
> DataStream api and Table API.
>
> There may be some redundancy, and I'm not sure this is a best way to
> resolve the question. First, use the StreamTableEnvironment to execute
> your original ddl without pk.
> Second,  use
>  ```
>
> *val table = tEnv*
>
>   .toChangelogStream(*tEnv*.sqlQuery("select employee.id, employee.name
> from Employee"))
>
>   .toTable(tableEnv, Schema.*newBuilder*().column(xxx,
> xxx).primaryKey(xxx).build());
>
> *tEnv.*createTemporaryView("Employee2", table);
>  ```
> to build the table with schema you want.
> Third, use
> ```
> *tEnv*.executeSql(xxx)
> ```
> to execute the DML.
>
> Another good way is to build a separate job to extract the 'employee' to
> a single sink table and use it directly.
>
> BTW, why you need the semantics about the pk?
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/
>
>
>
> --
>     Best!
>     Xuyang
>
>
> At 2023-11-01 15:28:25, "elakiya udhayanan" <laks....@gmail.com> wrote:
>
> Hi Xuyang,
>
> Thank you for your response. Since, I have no access to create a ticket in
> the ASF jira I have requested for the access and once I get the access will
> raise a ticket for the same.
>
> Also, you have asked me to use Datastream API to extract the id and then
> use the TableAPI feature, since I have not used the Datastream API, could
> you help me with any example if possible, meanwhile i will try to do some
> learning on using the DataStream API.
>
> Thanks,
> Elakiya
>
> On Tue, Oct 31, 2023 at 7:34 AM Xuyang <xyzhong...@163.com> wrote:
>
>> Hi, Flink SQL doesn't support a inline field in struct type as pk. You
>> can try to raise an issue about this feature in community[1].
>>
>> For a quick solution, you can try to transform it by DataStream API first
>> by extracting the 'id' and then convert it to Table API to use SQL.
>>
>> [1]
>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-33400?filter=allopenissues
>>
>> --
>>     Best!
>>     Xuyang
>>
>>
>> At 2023-10-30 16:42:03, "elakiya udhayanan" <laks....@gmail.com> wrote:
>>
>> Hi team,
>>
>> I have a Kafka topic named employee which uses confluent avro schema and
>> will emit the payload as below:
>>
>> {
>> "employee": {
>> "id": "123456",
>> "name": "sampleName"
>> }
>> }
>> I am using the upsert-kafka connector to consume the events from the
>> above Kafka topic as below using the Flink SQL DDL statement, also here I
>> want to use the id field as the Primary key. But I am unable to use the id
>> field since it is inside the object.
>>
>> DDL Statement:
>> String statement = "CREATE TABLE Employee (\r\n" +
>> "  employee  ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>> +
>> ")";
>> Any help is appreciated TIA
>>
>> Thanks,
>> Elakiya
>>
>>

Reply via email to