Re: Getting exception when writing to parquet file with generic types disabled
Hi Aniket, Currently the filesystem connector does not support option 'pipeline.generic-types'='false', because the connector will output `PartitionCommitInfo` messages for the downstream partition committer operator even when there are no partitions in the sink table. There is a `List partitions` field in `PartitionCommitInfo` which will cause the exception you mentioned in the thread. I have created an issue [1] for this. [1] https://issues.apache.org/jira/browse/FLINK-32129 Best, Shammon FY On Thu, May 18, 2023 at 9:20 PM Aniket Sule wrote: > Hi, > > I am trying to write data to parquet files using SQL insert statements. > Generic types are disabled in the execution environment. > > There are other queries running in the same job that are > counting/aggregating data. Generic types are disabled as a performance > optimization for those queries. > > > > In this scenario, whenever I try to insert data into parquet files, I get > an exception - > > Caused by: java.lang.UnsupportedOperationException: Generic types have > been disabled in the ExecutionConfig and type java.util.List is treated as > a generic type. > > > > I get the above exception even when I test with a simple table that has no > array or list data types. > > > > Is there any way to write parquet files with generic types disabled? > > > > Thanks and regards, > > Aniket Sule. > > > > > > Here is a way to reproduce what I am seeing. > > My actual source is Kafka with data that is in json format. > > Datagen is simply to quickly reproduce the scenario. > > The environment is Flink 1.17.0. > > I am using the SQL cli. > > > > set 'sql-client.verbose'='true'; > > set 'table.exec.source.idle-timeout'='1000'; > > set 'table.optimizer.join-reorder-enabled'='true'; > > set 'table.exec.mini-batch.enabled'='true'; > > set 'table.exec.mini-batch.allow-latency'='5 s'; > > set 'table.exec.mini-batch.size'='5000'; > > set 'table.optimizer.agg-phase-strategy'='TWO_PHASE'; > > set 'table.optimizer.distinct-agg.split.enabled'='true'; > > set 'table.exec.state.ttl'='360 s'; > > set 'pipeline.object-reuse'='true'; > > set 'pipeline.generic-types'='false'; > > set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true'; > > > > CREATE TABLE source_t ( > > order_number BIGINT, > > order_name string, > > risk float, > > order_time TIMESTAMP(3) > > ) WITH ( > > 'connector' = 'datagen' > > ); > > > > CREATE TABLE file_t ( > > order_number BIGINT, > > order_name string, > > risk float, > > `year` string,`month` string,`day` string,`hour` string > > ) WITH ( > > 'connector'='filesystem', > > 'path' = '/tmp/data', > > 'format'='parquet' > > ); > > > > insert into file_t > > select order_number,order_name,risk , > > date_format(order_time,'') as `year`, date_format(order_time,'MM') as > `month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH') > as `hour` > > from source_t; > > > > Resulting exception: > > [ERROR] Could not execute SQL statement. Reason: > > org.apache.flink.runtime.rest.util.RestClientException: [Internal server > error., > org.apache.flink.table.gateway.api.utils.SqlGatewayException: > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to > fetchResults. > > at > org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85) > > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84) > > at > org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52) > > at > org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196) > > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) > > at > java.base/java.util.Optional.ifPresent(Optional.java:183) > > at > org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) > > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) > > at > org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) > > at > org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > > at >
python udf with flinksql
Hi I am trying to create a flinksql program using python udf & using metrics. This is my sample python file custom_udf_2.py ``` from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes class MyUDF(ScalarFunction): def __init__(self): self.counter = None def open(self, function_context): self.counter = function_context.get_metric_group().counter("my_counter") def eval(self, x, y): self.counter.inc() return x + y ``` This is my sql script ``` CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; CREATE TABLE datagen ( a BIGINT, b BIGINT ) WITH ( 'connector' = 'datagen', 'fields.a.kind'='sequence', 'fields.a.start'='1', 'fields.a.end'='8', 'fields.b.kind'='sequence', 'fields.b.start'='4', 'fields.b.end'='11' ); CREATE TABLE print_sink ( `sum` BIGINT ) WITH ( 'connector' = 'print' ); INSERT into print_sink ( select add(a,b) FROM datagen ); ``` When I try to execute this program I get the following ``` /bin/sql-client.sh -f ~/python_udf_lab.sql --pyFiles ~/custom_udf_2.py Flink SQL> [INFO] Execute statement succeed. Flink SQL> > CREATE TABLE datagen ( > a BIGINT, > b BIGINT > ) WITH ( > 'connector' = 'datagen', > 'fields.a.kind'='sequence', > 'fields.a.start'='1', > 'fields.a.end'='8', > 'fields.b.kind'='sequence', > 'fields.b.start'='4', > 'fields.b.end'='11' > )[INFO] Execute statement succeed. Flink SQL> > CREATE TABLE print_sink ( > `sum` BIGINT > ) WITH ( > 'connector' = 'print' > )[INFO] Execute statement succeed. Flink SQL> > > INSERT into print_sink ( > select add(a,b) FROM datagen > )[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Instantiating python function 'custom_udf_2.MyUDF' failed. ``` Ive tried multiple variations of CREATE FUNCTION add AS 'custom_udf_2.MyUDF' LANGUAGE PYTHON; CREATE FUNCTION add AS 'MyUDF' LANGUAGE PYTHON; fyi this is on flink 1.16.1 & python 3.9.13 Admittingly I haven’t any documentation on the official documentation with this usage. Is this usecase currently supported? I know that it works with sql if I change the add function as, @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j but then it doesn’t create any metrics Does anyone has any idea how I can get this to work specifically with flinksql with python udf metrics Thanks, Tom
table.exec.source.cdc-events-duplicate参数问题
mysql binlog 操作记录发到 kafka topic 中,消息格式是canal json,现通过flink sql实时同步写入另一个mysql库。今天发现实时作业抛错说写入mysql时遇到duplicate key error,查了一下发现是kafka topic中存在两条相同的消息,即相同主键且都是INSERT操作的消息。请问这种情况有什么办法可以避免作业出错吗? 查了官方文档说要在作业中添加参数 table.exec.source.cdc-events-duplicate ,相当于是在作业中添加了一个状态算子用于去重,如果这张表不同主键的记录非常多的话,岂不是让其状态很占内存?而作业本身如果配置了状态过期参数,会不会造成无法精准去重?谢谢! [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/#duplicate-change-events
Getting exception when writing to parquet file with generic types disabled
Hi, I am trying to write data to parquet files using SQL insert statements. Generic types are disabled in the execution environment. There are other queries running in the same job that are counting/aggregating data. Generic types are disabled as a performance optimization for those queries. In this scenario, whenever I try to insert data into parquet files, I get an exception - Caused by: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. I get the above exception even when I test with a simple table that has no array or list data types. Is there any way to write parquet files with generic types disabled? Thanks and regards, Aniket Sule. Here is a way to reproduce what I am seeing. My actual source is Kafka with data that is in json format. Datagen is simply to quickly reproduce the scenario. The environment is Flink 1.17.0. I am using the SQL cli. set 'sql-client.verbose'='true'; set 'table.exec.source.idle-timeout'='1000'; set 'table.optimizer.join-reorder-enabled'='true'; set 'table.exec.mini-batch.enabled'='true'; set 'table.exec.mini-batch.allow-latency'='5 s'; set 'table.exec.mini-batch.size'='5000'; set 'table.optimizer.agg-phase-strategy'='TWO_PHASE'; set 'table.optimizer.distinct-agg.split.enabled'='true'; set 'table.exec.state.ttl'='360 s'; set 'pipeline.object-reuse'='true'; set 'pipeline.generic-types'='false'; set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true'; CREATE TABLE source_t ( order_number BIGINT, order_name string, risk float, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' ); CREATE TABLE file_t ( order_number BIGINT, order_name string, risk float, `year` string,`month` string,`day` string,`hour` string ) WITH ( 'connector'='filesystem', 'path' = '/tmp/data', 'format'='parquet' ); insert into file_t select order_number,order_name,risk , date_format(order_time,'') as `year`, date_format(order_time,'MM') as `month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH') as `hour` from source_t; Resulting exception: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., ] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516) at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caution: External email. Do not click or open attachments unless you know and trust the sender.
Backpressure handling in FileSource APIs - Flink 1.16
Hello Community, Does FileSource APIs for Bulk and Record stream formats handle back pressure by any way like slowing down sending data in piepline further or reading data from source somehow? Or does it give any callback/handle so that any action can be taken? Can you please share details if any? Rgds, Kamal
Re: Issue with Incremental window aggregation using Aggregate function.
Any thoughts on this? On Fri, Apr 21, 2023 at 4:10 PM Sumanta Majumdar wrote: > Hi, > > Currently we have a streaming use case where we have a flink application > which runs on a session cluster which is responsible for reading data from > Kafka source which is basically table transaction events getting ingested > into the upstream kafka topic which is converted to a row and then > deduplicated to extract distinct rows and persist via a sink function to an > external warehouse such as vertica or snowflake schema. > > Now initially what we have observed by using TumblingWindow windows > assigner implementation is that the state sizes are growing unconditionally > even when we have tuned rocksdb options and provided a good chunk of > managed memory. > > We are able to read more than 15 records within a period of 4 mins > which is our time window set based on our requirements. > > Now one optimization which I see is suggested through the flink docs in > order to reduce the state size is to use incremental aggregation using > reduce or aggregate functions available. > > Now I did use aggregate function along with window in order implement the > same but now I am observing that the consumption rate has reduced > drastically post this implementation which has increased the overall > throughput of the pipeline. > > Any thoughts as to why this can happen? > > -- > Thanks and Regards, > Sumanta Majumdar > -- Thanks and Regards, Sumanta Majumdar
IRSA with Flink S3a connector
Hi, I have a flink job running on EKS, reading and writing data records to S3 buckets. I am trying to set up access credentials via AWS IAM. I followed this: https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html I have configured: com.amazonaws.auth.WebIdentityTokenCredentialsProvider as the credential provider in flink-conf.yaml for hadoop s3a connector, and annotated my service account with the role. When running the job, i am getting access denied error Exception: Caused by: com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException: Not authorized to perform sts:AssumeRoleWithWebIdentity (Service: AWSSecurityTokenService; Status Code: 403; Error Code: AccessDenied; Request ID: 923df33a-802e-47e2-a203-0841aca03dd8; Proxy: null) I have tried to access S3 buckets from AWS CLI running in a pod with the same service account and that works. Am I using the correct credential provider for IAM integration, not sure if Hadoop S3a supports it. https://issues.apache.org/jira/browse/HADOOP-18154 Please advise if I am doing anything wrong in setting up credentials via IAM. Regards Anuj Jain