[jira] [Updated] (FLINK-34044) Kinesis Sink Cannot be Created via TableDescriptor

2024-03-22 Thread Danny Cranmer (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Danny Cranmer updated FLINK-34044:
--
Fix Version/s: aws-connector-4.3.0

> Kinesis Sink Cannot be Created via TableDescriptor
> --
>
> Key: FLINK-34044
> URL: https://issues.apache.org/jira/browse/FLINK-34044
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.2.0
>Reporter: Tilman Krokotsch
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> When trying to create a Kinesis Stream Sink in Table API via a 
> TableDescriptor I get an error:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException
>     at 
> java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.(KinesisStreamsConnectorOptionsUtils.java:90)
>     at 
> org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
>     at 
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
>     ... 20 more
> {code}
> Here is a minimum reproducing example with Flink-1.17.2 and 
> flink-connector-kinesis-4.2.0:
> {code:java}
> public class Job {
>   public static void main(String[] args) throws Exception {
> // create data stream environment
> StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
> Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
> tEnv.createTemporaryTable(
> "exampleTable", 
> TableDescriptor.forConnector("datagen").schema(a).build());
> TableDescriptor descriptor =
> TableDescriptor.forConnector("kinesis")
> .schema(a)
> .format("json")
> .option("stream", "abc")
> .option("aws.region", "eu-central-1")
> .build();
> tEnv.createTemporaryTable("sinkTable", descriptor);
> tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
>   }
> } {code}
> From my investigation, the error is triggered by the `ResolvedCatalogTable` 
> used when re-mapping the deprecated Kinesis options in 
> `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns 
> an `UnmodifiableMap` which is not mutable.
> If the sink table is created via SQL, the error does not occur:
> {code:java}
> tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
> {code}
> because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34044) Kinesis Sink Cannot be Created via TableDescriptor

2024-03-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-34044:
---
Labels: pull-request-available  (was: )

> Kinesis Sink Cannot be Created via TableDescriptor
> --
>
> Key: FLINK-34044
> URL: https://issues.apache.org/jira/browse/FLINK-34044
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / AWS
>Affects Versions: aws-connector-4.2.0
>Reporter: Tilman Krokotsch
>Priority: Major
>  Labels: pull-request-available
>
> When trying to create a Kinesis Stream Sink in Table API via a 
> TableDescriptor I get an error:
> {code:java}
> Caused by: java.lang.UnsupportedOperationException
>     at 
> java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.removeMappedOptions(KinesisStreamsConnectorOptionsUtils.java:249)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils$KinesisProducerOptionsMapper.mapDeprecatedClientOptions(KinesisStreamsConnectorOptionsUtils.java:158)
>     at 
> org.apache.flink.connector.kinesis.table.util.KinesisStreamsConnectorOptionsUtils.(KinesisStreamsConnectorOptionsUtils.java:90)
>     at 
> org.apache.flink.connector.kinesis.table.KinesisDynamicTableSinkFactory.createDynamicTableSink(KinesisDynamicTableSinkFactory.java:61)
>     at 
> org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
>     ... 20 more
> {code}
> Here is a minimum reproducing example with Flink-1.17.2 and 
> flink-connector-kinesis-4.2.0:
> {code:java}
> public class Job {
>   public static void main(String[] args) throws Exception {
> // create data stream environment
> StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> sEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv);
> Schema a = Schema.newBuilder().column("a", DataTypes.STRING()).build();
> tEnv.createTemporaryTable(
> "exampleTable", 
> TableDescriptor.forConnector("datagen").schema(a).build());
> TableDescriptor descriptor =
> TableDescriptor.forConnector("kinesis")
> .schema(a)
> .format("json")
> .option("stream", "abc")
> .option("aws.region", "eu-central-1")
> .build();
> tEnv.createTemporaryTable("sinkTable", descriptor);
> tEnv.from("exampleTable").executeInsert("sinkTable"); // error occurs here
>   }
> } {code}
> From my investigation, the error is triggered by the `ResolvedCatalogTable` 
> used when re-mapping the deprecated Kinesis options in 
> `KinesisProducerOptionsMapper`. The `getOptions` method of the table returns 
> an `UnmodifiableMap` which is not mutable.
> If the sink table is created via SQL, the error does not occur:
> {code:java}
> tEnv.executeSql("CREATE TABLE sinkTable " + descriptor.toString());
> {code}
> because `ResolvedCatalogTable.getOptions` returns a regular `HashMap`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)