Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-08 Thread Theodor Wübker
Thanks for your reply Yaroslav! The way I do it with Avro seems similar to what you pointed out: ResolvedSchema resultSchema = resultTable.getResolvedSchema(); DataType type = resultSchema.toSinkRowDataType(); org.apache.avro.Schema converted = AvroSchemaConverter.convertToSchema(type.getLogicalT

Re: How to write custom serializer for dynamodb connector

2022-11-08 Thread Matt Fysh
Thanks Hong, I moved the AttributeValue creation into the ElementConverter and it started working without any custom serde work! The reason for creating AttributeValue instances in a previous operator is that I was closely following the example code: https://github.com/apache/flink-connector-aws/b

Re: How to write custom serializer for dynamodb connector

2022-11-08 Thread Teoh, Hong
Hi Matt, First of all, awesome that you are using the DynamoDB sink! To resolve your issue with serialization in the DDB sink, you are right, the issue only happens when you create the AttributeValue object in a previous operator and send it to the sink. The issue here is with serializing of Im

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-08 Thread Yaroslav Tkachenko
Got it. I'm not sure why you mentioned ResolvedSchema in the first place, usually in the Table API you work with RowType / RowData. - For Avro I use AvroSchemaConverter.convertToSchema method to get an Avro schema from a RowType, then use org.apache.flink.formats.avro.RowDataToAvroConverters to se

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-08 Thread Yaroslav Tkachenko
Hey Theo, have you looked at the flink-json and flink-protobuf packages? On Tue, Nov 8, 2022 at 5:21 AM Theodor Wübker wrote: > Hello, > > I have a streaming use case, where I execute a query on a Table. I take > the ResolvedSchema of the table and convert it to an Avro-Schema using the > AvroSc

How to write custom serializer for dynamodb connector

2022-11-08 Thread Matt Fysh
I'm attempting to use the dynamodb sink located at https://github.com/apache/flink-connector-aws The example in the repo is worki

Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread Maximilian Michels
Taking a step back here: I think this needs to be handled in the application mode in any case. Even if we had a better parser, it would still treat # as a comment char. The application mode needs to be fixed to come up with an escape scheme. YAML supports this via \# but that won't work with our pa

Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread Maximilian Michels
The job fails when starting because its arguments are passed through the Flink configuration in application deployment mode. >This is a known limit of the current Flink options parser. Refer to FLINK-15358[1] for more information. Exactly. The issue stems from the GlobalConfiguration#loadYAMLReso

Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-08 Thread Theodor Wübker
Hello, I have a streaming use case, where I execute a query on a Table. I take the ResolvedSchema of the table and convert it to an Avro-Schema using the AvroSchemaConverter. Now I want to do the same for JSON and Protobuf. However, it seems there is nothing similar to AvroSchemaConverter - I

Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread liuxiangcao
Hi Yang, Do you think flink-conf not supporting `#` in FLINK-15358[1] and Flink job spec not supporting `#` are caused by some common code? or maybe they are in different code paths? My first guess was they are in different code paths. The flink-conf is parsed when starting the flink cluster w

Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread liuxiangcao
Hi Gyula, Thanks for getting back. Could you share how to submit job to flinkk8operator in json format? We use the java Fabric8 K8 client, which serializes java FlinkDeployment objects to CustomResource YAML (see the code snippet below). Since `#` is considered a special character denoting comme

Re: Kinesis Connector does not work

2022-11-08 Thread Chesnay Schepler
This is a general thing; see https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/configuration/connector/ The python documentation isn't particularly clear on how to use Java connectors. The easiest thing would be to use the "sql-*" connector jars I guess. On 08/11/2022 11:49

Re: Kinesis Connector does not work

2022-11-08 Thread Matt Fysh
Ok thanks, will give that a try. Is that something that should be added to the Kinesis connector docs page? There are existing instructions there for adding the flink-connector-kinesis jar as a dependency, but no instructions for adding commons-logging Or if this is something more general, it migh

Re: support escaping `#` in flink job spec in Flink-operator

2022-11-08 Thread Yang Wang
This is a known limit of the current Flink options parser. Refer to FLINK-15358[1] for more information. [1]. https://issues.apache.org/jira/browse/FLINK-15358 Best, Yang Gyula Fóra 于2022年11月8日周二 14:41写道: > It is also possible that this is a problem of the Flink native Kubernetes > integration

Re: Kinesis Connector does not work

2022-11-08 Thread Chesnay Schepler
Said dependency (on commons-logging) is not meant to be provided by the docker image, but bundled in your user-jar (along with the connector). On 08/11/2022 02:14, Matt Fysh wrote: Hi, I'm following the kinesis connector instructions as documented here: https://nightlies.apache.org/flink/flink