Re: Getting exception when writing to parquet file with generic types disabled

2023-05-18 Thread Shammon FY
Hi Aniket,

Currently the filesystem connector does not support option
'pipeline.generic-types'='false', because the connector will output
`PartitionCommitInfo` messages for the downstream partition committer
operator even when there are no partitions in the sink table. There is a
`List partitions` field in `PartitionCommitInfo` which will cause
the exception you mentioned in the thread. I have created an issue [1] for
this.

[1] https://issues.apache.org/jira/browse/FLINK-32129

Best,
Shammon FY


On Thu, May 18, 2023 at 9:20 PM Aniket Sule 
wrote:

> Hi,
>
> I am trying to write data to parquet files using SQL insert statements.
> Generic types are disabled in the execution environment.
>
> There are other queries running in the same job that are
> counting/aggregating data. Generic types are disabled as a performance
> optimization for those queries.
>
>
>
> In this scenario, whenever I try to insert data into parquet files, I get
> an exception -
>
> Caused by: java.lang.UnsupportedOperationException: Generic types have
> been disabled in the ExecutionConfig and type java.util.List is treated as
> a generic type.
>
>
>
> I get the above exception even when I test with a simple table that has no
> array or list data types.
>
>
>
> Is there any way to write parquet files with generic types disabled?
>
>
>
> Thanks and regards,
>
> Aniket Sule.
>
>
>
>
>
> Here is a way to reproduce what I am seeing.
>
> My actual source is Kafka with data that is in json format.
>
> Datagen is simply to quickly reproduce the scenario.
>
> The environment is Flink 1.17.0.
>
> I am using the SQL cli.
>
>
>
> set 'sql-client.verbose'='true';
>
> set 'table.exec.source.idle-timeout'='1000';
>
> set 'table.optimizer.join-reorder-enabled'='true';
>
> set 'table.exec.mini-batch.enabled'='true';
>
> set 'table.exec.mini-batch.allow-latency'='5 s';
>
> set 'table.exec.mini-batch.size'='5000';
>
> set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
>
> set 'table.optimizer.distinct-agg.split.enabled'='true';
>
> set 'table.exec.state.ttl'='360 s';
>
> set 'pipeline.object-reuse'='true';
>
> set 'pipeline.generic-types'='false';
>
> set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true';
>
>
>
> CREATE TABLE source_t (
>
>   order_number BIGINT,
>
>   order_name string,
>
>   risk float,
>
>   order_time   TIMESTAMP(3)
>
>   ) WITH (
>
> 'connector' = 'datagen'
>
>   );
>
>
>
> CREATE TABLE file_t (
>
>   order_number BIGINT,
>
>   order_name string,
>
>   risk float,
>
>   `year` string,`month` string,`day` string,`hour` string
>
>   ) WITH (
>
> 'connector'='filesystem',
>
> 'path' = '/tmp/data',
>
> 'format'='parquet'
>
>   );
>
>
>
> insert into file_t
>
> select order_number,order_name,risk ,
>
> date_format(order_time,'') as `year`, date_format(order_time,'MM') as
> `month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH')
> as `hour`
>
> from source_t;
>
>
>
> Resulting exception:
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException:
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> fetchResults.
>
> at
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85)
>
> at
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>
> at
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>
> at
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>
> at
> java.base/java.util.Optional.ifPresent(Optional.java:183)
>
> at
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>
> at
> 

python udf with flinksql

2023-05-18 Thread tom yang
Hi


I am trying to create a flinksql program using python udf & using
metrics. This is my sample python file

custom_udf_2.py

```
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

class MyUDF(ScalarFunction):

  def __init__(self):
self.counter = None

  def open(self, function_context):
self.counter = function_context.get_metric_group().counter("my_counter")

  def eval(self, x, y):
self.counter.inc()
return x + y

```

This is my sql script

```
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;

CREATE TABLE datagen (
a BIGINT,
b BIGINT
) WITH (
'connector' = 'datagen',
'fields.a.kind'='sequence',
'fields.a.start'='1',
'fields.a.end'='8',
'fields.b.kind'='sequence',
'fields.b.start'='4',
'fields.b.end'='11'
);

CREATE TABLE print_sink (
`sum` BIGINT
) WITH (
'connector' = 'print'
);


INSERT into print_sink (
select add(a,b) FROM datagen
);

```

When I try to execute this program I get the following


```
/bin/sql-client.sh -f ~/python_udf_lab.sql
--pyFiles ~/custom_udf_2.py

Flink SQL> [INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE datagen (
> a BIGINT,
> b BIGINT
> ) WITH (
> 'connector' = 'datagen',
> 'fields.a.kind'='sequence',
> 'fields.a.start'='1',
> 'fields.a.end'='8',
> 'fields.b.kind'='sequence',
> 'fields.b.start'='4',
> 'fields.b.end'='11'
> )[INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE print_sink (
> `sum` BIGINT
> ) WITH (
> 'connector' = 'print'
> )[INFO] Execute statement succeed.

Flink SQL>
>
> INSERT into print_sink (
> select add(a,b) FROM datagen
> )[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function
'custom_udf_2.MyUDF' failed.
```


Ive tried multiple variations of
CREATE FUNCTION add AS 'custom_udf_2.MyUDF'
LANGUAGE PYTHON;

CREATE FUNCTION add AS 'MyUDF'
LANGUAGE PYTHON;


fyi this is on flink 1.16.1 & python 3.9.13

Admittingly I haven’t any documentation on the official documentation
with this usage. Is this usecase currently supported?
I know that it works with sql if I change the add function as,

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()],
result_type=DataTypes.BIGINT())
def add(i, j):
  return i + j

but then it doesn’t create any metrics

Does anyone has any idea how I can get this to work specifically with
flinksql with python udf metrics

Thanks,
Tom


table.exec.source.cdc-events-duplicate参数问题

2023-05-18 Thread casel.chen
mysql binlog 操作记录发到 kafka topic 中,消息格式是canal json,现通过flink 
sql实时同步写入另一个mysql库。今天发现实时作业抛错说写入mysql时遇到duplicate key error,查了一下发现是kafka 
topic中存在两条相同的消息,即相同主键且都是INSERT操作的消息。请问这种情况有什么办法可以避免作业出错吗?


查了官方文档说要在作业中添加参数 table.exec.source.cdc-events-duplicate 
,相当于是在作业中添加了一个状态算子用于去重,如果这张表不同主键的记录非常多的话,岂不是让其状态很占内存?而作业本身如果配置了状态过期参数,会不会造成无法精准去重?谢谢!


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/canal/#duplicate-change-events

Getting exception when writing to parquet file with generic types disabled

2023-05-18 Thread Aniket Sule
Hi,
I am trying to write data to parquet files using SQL insert statements. Generic 
types are disabled in the execution environment.
There are other queries running in the same job that are counting/aggregating 
data. Generic types are disabled as a performance optimization for those 
queries.

In this scenario, whenever I try to insert data into parquet files, I get an 
exception -
Caused by: java.lang.UnsupportedOperationException: Generic types have been 
disabled in the ExecutionConfig and type java.util.List is treated as a generic 
type.

I get the above exception even when I test with a simple table that has no 
array or list data types.

Is there any way to write parquet files with generic types disabled?

Thanks and regards,
Aniket Sule.


Here is a way to reproduce what I am seeing.
My actual source is Kafka with data that is in json format.
Datagen is simply to quickly reproduce the scenario.
The environment is Flink 1.17.0.
I am using the SQL cli.

set 'sql-client.verbose'='true';
set 'table.exec.source.idle-timeout'='1000';
set 'table.optimizer.join-reorder-enabled'='true';
set 'table.exec.mini-batch.enabled'='true';
set 'table.exec.mini-batch.allow-latency'='5 s';
set 'table.exec.mini-batch.size'='5000';
set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
set 'table.optimizer.distinct-agg.split.enabled'='true';
set 'table.exec.state.ttl'='360 s';
set 'pipeline.object-reuse'='true';
set 'pipeline.generic-types'='false';
set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true';

CREATE TABLE source_t (
  order_number BIGINT,
  order_name string,
  risk float,
  order_time   TIMESTAMP(3)
  ) WITH (
'connector' = 'datagen'
  );

CREATE TABLE file_t (
  order_number BIGINT,
  order_name string,
  risk float,
  `year` string,`month` string,`day` string,`hour` string
  ) WITH (
'connector'='filesystem',
'path' = '/tmp/data',
'format'='parquet'
  );

insert into file_t
select order_number,order_name,risk ,
date_format(order_time,'') as `year`, date_format(order_time,'MM') as 
`month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH') as 
`hour`
from source_t;

Resulting exception:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., ]
at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
at 
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caution: External email. Do not click or open attachments unless you know and 
trust the sender.


Backpressure handling in FileSource APIs - Flink 1.16

2023-05-18 Thread Kamal Mittal
Hello Community,

Does FileSource APIs for Bulk and Record stream formats handle back
pressure by any way like slowing down sending data in piepline further or
reading data from source somehow?
Or does it give any callback/handle so that any action can be taken? Can
you please share details if any?

Rgds,
Kamal


Re: Issue with Incremental window aggregation using Aggregate function.

2023-05-18 Thread Sumanta Majumdar
Any thoughts on this?

On Fri, Apr 21, 2023 at 4:10 PM Sumanta Majumdar 
wrote:

> Hi,
>
> Currently we have a streaming use case where we have a flink application
> which runs on a session cluster which is responsible for reading data from
> Kafka source which is basically table transaction events getting ingested
> into the upstream kafka topic which is converted to a row and then
> deduplicated to extract distinct rows and persist via a sink function to an
> external warehouse such as vertica or snowflake schema.
>
> Now initially what we have observed by using TumblingWindow windows
> assigner implementation is that the state sizes are growing unconditionally
> even when we have tuned rocksdb options and provided a good chunk of
> managed memory.
>
> We are able to read more than 15 records within a period of 4 mins
> which is our time window set based on our requirements.
>
> Now one optimization which I see is suggested through the flink docs in
> order to reduce the state size is to use incremental aggregation using
> reduce or aggregate functions available.
>
> Now I did use aggregate function along with window in order implement the
> same but now I am observing that the consumption rate has reduced
> drastically post this implementation which has increased the overall
> throughput of the pipeline.
>
> Any thoughts as to why this can happen?
>
> --
> Thanks and Regards,
> Sumanta Majumdar
>


-- 
Thanks and Regards,
Sumanta Majumdar


IRSA with Flink S3a connector

2023-05-18 Thread Anuj Jain
Hi,
I have a flink job running on EKS, reading and writing data records to S3
buckets.
I am trying to set up access credentials via AWS IAM.
I followed this:
https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html

I have configured: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
as the credential provider in flink-conf.yaml for hadoop s3a connector, and
annotated my service account with the role.

When running the job, i am getting access denied error
Exception:
Caused by:
com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException:
Not authorized to perform sts:AssumeRoleWithWebIdentity (Service:
AWSSecurityTokenService; Status Code: 403; Error Code: AccessDenied;
Request ID: 923df33a-802e-47e2-a203-0841aca03dd8; Proxy: null)

I have tried to access S3 buckets from AWS CLI running in a pod with the
same service account and that works.

Am I using the correct credential provider for IAM integration, not sure if
Hadoop S3a supports it.
https://issues.apache.org/jira/browse/HADOOP-18154

Please advise if I am doing anything wrong in setting up credentials via
IAM.

Regards
Anuj Jain