Re: flink-kubernetes-operator HA k8s RoleBinding for Leases?

2023-05-08 Thread Gyula Fóra
Hey!

Sounds like a bug :) Could you please open a jira / PR (in case you fixed
this already)?

Thanks
Gyula

On Mon, 8 May 2023 at 22:20, Andrew Otto  wrote:

> Hi,
>
> I'm trying to enable HA for flink-kubernetes-operator
> 
> with Helm.  We are using namespaced RBAC via watchedNamespaces.
>
> I've followed instructions and set
> kubernetes.operator.leader-election.enabled and
> kubernetes.operator.leader-election.lease-name, and increased replicas to
> 2.  When I deploy, the second replica comes online, but errors with:
>
> Exception occurred while acquiring lock 'LeaseLock: flink-operator -
> flink-operator-lease (flink-kubernetes-operator-86b888d6b6-8cxjs
> Failure executing: GET at:
> https://x.x.x.x/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease.
> Message: Forbidden!Configured service account doesn't have access. Service
> account may have been revoked. leases.coordination.k8s.io
> "flink-operator-lease" is forbidden: User
> "system:serviceaccount:flink-operator:flink-operator" cannot get resource
> "leases" in API group "coordination.k8s.io" in the namespace
> "flink-operator".
>
> Looking at the rbac.yaml helm template
> ,
> it looks like the Role and RoleBindings that grant access to the leases
> resource are created for the configured watchNamespaces, but not for the
> namespace in which the flink-kubernetes-operator is deployed.  I think that
> for HA, the flink-kubernetes-operator is going to be asking k8s for Leases
> in its own namespace, right?
>
> Is this a bug, or am I doing something wrong?  I'd file a JIRA, but I
> betcha I'm just doing something wrong (unless I'm the first person who's
> tried to use HA + namespaced RBAC with the helm charts?).
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
>
>
>


Re: akka.remote.OversizedPayloadException after we upgrade to Flink 1.15

2023-05-08 Thread Shammon FY
Hi Wei,

>From the error message, I guess the reason for the issue is that the events
sent by SplitEnumerator to the source exceeds the default size of akka. You
can add the option 'akka.framesize' to set the akka packet size, or try to
decrease the event size.

When you use 'FlinkKafkaConsumer' to read data from kafka, the source
subtask in TaskManager will connect to kafka and read data directly, but
'KafkaSource' doesn't act as that. You can refer to FLIP-27 [1] to get more
detailed information about 'KafkaSource'. Simply speaking, the
'SplitEnumerator' in JobManager will get splits from kafka and send them to
source subtask to read data.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Best,
Shammon FY


On Tue, May 9, 2023 at 2:37 AM Wei Hou via user 
wrote:

> Hi Team,
>
> We hit an issue after we upgrade our job from Flink 1.12 to 1.15,  there's
> a consistent akka.remote.OversizedPayloadException after job restarts:
>
> Transient association error (association remains live)
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to
> Actor[akka.tcp://flink@xxx/user/rpc/taskmanager_0#-311495648]: max
> allowed size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 33670549
> bytes.
>
>
> In the job, We changed the kafka consumer from FlinkKafkaConsumer to the
> new KafkaSource, and we noticed there's a stackoverflow (
> https://stackoverflow.com/questions/75363084/jobs-stuck-while-trying-to-restart-from-a-checkpoint
> )  talking about _metadata file size kept doubling after that change.
>
> We later checked the _metadata for our own job, it did increase a lot for
> each restart, (around 128 MB when we hit the akka error). I'd like to see
> if there's a known root cause for this problem and what can we do here to
> eliminate it?
>
>
> Best,
> Wei
>


RE: MSI Auth to Azure Storage Account with Flink Apache Operator not working

2023-05-08 Thread DEROCCO, CHRISTOPHER
Shammon,



I’m still having trouble setting the package in my cluster environment. I have 
these lines added to my dockerfile


mkdir ./plugins/azure-fs-hadoop

cp ./opt/flink-azure-fs-hadoop-1.16.0.jar ./plugins/azure-fs-hadoop/

according to the flink docs here 
(https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/filesystems/azure/)
This should enable the flink-azure-fs-hadoop jar in the environment which has 
the classes to enable the adls2 MSI authentication.
I also have the following dependency in my pom to add it to the FAT Jar.


org.apache.flink
flink-azure-fs-hadoop
${flink.version}


However, I still get the class not found error and the flink job is not able to 
authenticate to the azure storage account to store its checkpoints. I’m not 
sure what other configuration pieces I’m missing. Has anyone had successful 
with writing checkpoints to Azure ADLS2gen Storage with managed service 
identity (MSI) authentication.?



From: Shammon FY 
Sent: Friday, May 5, 2023 8:38 PM
To: DEROCCO, CHRISTOPHER 
Cc: user@flink.apache.org
Subject: Re: MSI Auth to Azure Storage Account with Flink Apache Operator not 
working

Hi DEROCCO,

I think you can check the startup command of the job on k8s to see if the jar 
file is in the classpath.

If your job is DataStream, you need to add hadoop azure dependency in your 
project, and if it is an SQL job, you need to include this jar file in your 
Flink release package. Or you can also add this package in your cluster 
environment.

Best,
Shammon FY


On Fri, May 5, 2023 at 10:21 PM DEROCCO, CHRISTOPHER 
mailto:cd9...@att.com>> wrote:
How can I add the package to the flink job or check if it is there?

From: Shammon FY mailto:zjur...@gmail.com>>
Sent: Thursday, May 4, 2023 9:59 PM
To: DEROCCO, CHRISTOPHER mailto:cd9...@att.com>>
Cc: user@flink.apache.org
Subject: Re: MSI Auth to Azure Storage Account with Flink Apache Operator not 
working

Hi DEROCCO,

I think you need to check whether there is a hadoop-azure jar file in the 
classpath of your flink job. From an error message 'Caused by: 
java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.', your flink 
job may be missing this package.

Best,
Shammon FY


On Fri, May 5, 2023 at 4:40 AM DEROCCO, CHRISTOPHER 
mailto:cd9...@att.com>> wrote:

I receive the error:  Caused by: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.
I’m using flink 1.16 running in Azure Kubernetes using the Flink Apache 
Kubernetes Operator.
I have the following specified in the spec.flinkConfiguration: as per the 
Apache Kubernetes operator documentation.

fs.azure.createRemoteFileSystemDuringInitialization: "true"

fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net:
 OAuth

fs.azure.account.oauth.provider.type..dfs.core.windows.net:
 org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider
fs.azure.account.oauth2.msi.tenant. 
.dfs.core.windows.net:
 

fs.azure.account.oauth2.client.id.
 
.dfs.core.windows.net:
 
fs.azure.account.oauth2.client.endpoint. 
.dfs.core.windows.net:
 
https://login.microsoftonline.com//oauth2/token

I also have this specified in the container environment variables.
- name: ENABLE_BUILT_IN_PLUGINS
   value: flink-azure-fs-hadoop-1.16.1.jar

I think I’m missing a configuration step because the MsiTokenProvider class is 
not found based on the logs. Any help would be appreciated.


Chris deRocco
Senior – Cybersecurity
Chief Security Office | STORM Threat Analytics

AT
Middletown, NJ
Phone: 732-639-9342
Email: cd9...@att.com
[cid:image001.png@01D981DC.619C4600]



flink-kubernetes-operator HA k8s RoleBinding for Leases?

2023-05-08 Thread Andrew Otto
Hi,

I'm trying to enable HA for flink-kubernetes-operator

with Helm.  We are using namespaced RBAC via watchedNamespaces.

I've followed instructions and set
kubernetes.operator.leader-election.enabled and
kubernetes.operator.leader-election.lease-name, and increased replicas to
2.  When I deploy, the second replica comes online, but errors with:

Exception occurred while acquiring lock 'LeaseLock: flink-operator -
flink-operator-lease (flink-kubernetes-operator-86b888d6b6-8cxjs
Failure executing: GET at:
https://x.x.x.x/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease.
Message: Forbidden!Configured service account doesn't have access. Service
account may have been revoked. leases.coordination.k8s.io
"flink-operator-lease" is forbidden: User
"system:serviceaccount:flink-operator:flink-operator" cannot get resource
"leases" in API group "coordination.k8s.io" in the namespace
"flink-operator".

Looking at the rbac.yaml helm template
,
it looks like the Role and RoleBindings that grant access to the leases
resource are created for the configured watchNamespaces, but not for the
namespace in which the flink-kubernetes-operator is deployed.  I think that
for HA, the flink-kubernetes-operator is going to be asking k8s for Leases
in its own namespace, right?

Is this a bug, or am I doing something wrong?  I'd file a JIRA, but I
betcha I'm just doing something wrong (unless I'm the first person who's
tried to use HA + namespaced RBAC with the helm charts?).

Thanks!
-Andrew Otto
 Wikimedia Foundation


akka.remote.OversizedPayloadException after we upgrade to Flink 1.15

2023-05-08 Thread Wei Hou via user
Hi Team,

We hit an issue after we upgrade our job from Flink 1.12 to 1.15,  there's
a consistent akka.remote.OversizedPayloadException after job restarts:

Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to
Actor[akka.tcp://flink@xxx/user/rpc/taskmanager_0#-311495648]: max allowed
size 10485760 bytes, actual size of encoded class
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 33670549
bytes.


In the job, We changed the kafka consumer from FlinkKafkaConsumer to the
new KafkaSource, and we noticed there's a stackoverflow (
https://stackoverflow.com/questions/75363084/jobs-stuck-while-trying-to-restart-from-a-checkpoint
)  talking about _metadata file size kept doubling after that change.

We later checked the _metadata for our own job, it did increase a lot for
each restart, (around 128 MB when we hit the akka error). I'd like to see
if there's a known root cause for this problem and what can we do here to
eliminate it?


Best,
Wei


Mounting files into native k8s Flink pods

2023-05-08 Thread Edgar H
Hi all!Running Flink on k8s (native, not using the operator) I was trying
to mount Hadoop configuration files into a certain directory within the
pods but I can't manage to do so.My existing deployment consists in a Job
spec which launches a .sh  file that will trigger the flink run-application
bit, that will launch a Pod that will subsequently trigger the TMs and all
the related stuff.At first I tried to mount the files as a volume within
the Job spec via flink-main-container, but it was unsuccessful and none was
mounted in the resultant Pod launched by the Job. Later I tried with the
example
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#example-of-pod-template
 but can't really get it to work... As of now, I've got no Pod definitions
since those are automatically created by Flink when launching the
previously mentioned Job.The way I'm seeing adding a Pod template is that
it will be decoupled from the one launched by the Job so they won't be
related in any way, am I right? Mainly because of the initContainer bit.Is
there any other way to mount files into the pods launched by a Job? If this
is the only way, how should I spec my Pod template to do so?


Question about RexNodeExtractor formatting UDF names

2023-05-08 Thread Chai Kelun
Hi Flink Team:

I have a question about RexNodeExtractor in Flink 1.16.0. I am trying to push 
down UDFs (with function names in the format ST_XXX, including underscores, 
e.g. ST_Contains) into TableSourceScan, and I have implemented applyFilters and 
handling of pushdown functions based on SupportFilterPushdown API. However, 
line 529 of RexNodeExtractor removes the underscore from the function name, 
which prevents the operator from being pushed down.
I tried hacking this logic and comparing the query plan:
[Before modify RexNodeExtractor#replace()]

== Abstract Syntax Tree ==
LogicalProject(id=[$0], name=[$1])
+- LogicalFilter(condition=[ST_Contains($2, ST_MakePoint(6, 2))])
   +- LogicalTableScan(table=[[default_catalog, default_database, extTable]])

== Optimized Physical Plan ==
Calc(select=[id, name], where=[ST_Contains(fence, ST_MakePoint(6, 2))])
+- TableSourceScan(table=[[default_catalog, default_database, extTable]], 
fields=[id, name, fence])

== Optimized Execution Plan ==
Calc(select=[id, name], where=[ST_Contains(fence, ST_MakePoint(6, 2))])
+- TableSourceScan(table=[[default_catalog, default_database, extTable]], 
fields=[id, name, fence])


[After modify RexNodeExtractor#replace()]
== Abstract Syntax Tree ==
LogicalProject(id=[$0], name=[$1])
+- LogicalFilter(condition=[ST_Contains($2, ST_MakePoint(6, 2))])
   +- LogicalTableScan(table=[[default_catalog, default_database, extTable]])

== Optimized Physical Plan ==
Calc(select=[id, name])
+- TableSourceScan(table=[[default_catalog, default_database, extTable, 
filter=[ST_Contains(fence, ST_MakePoint(6, 2))]]], fields=[id, name, fence])

== Optimized Execution Plan ==
Calc(select=[id, name])
+- TableSourceScan(table=[[default_catalog, default_database, extTable, 
filter=[ST_Contains(fence, ST_MakePoint(6, 2))]]], fields=[id, name, fence])

We can see that the operator cannot be pushed down because the function names 
are formatted. However, according to the Spatial Query standard, the geospatial 
function names are in the format of ST_xxx. I would like to ask what is the 
idea behind this design ?

Best regards,
Kelun


Re: Encryption of parameters in flink-conf.yaml

2023-05-08 Thread Biao Geng
Hi Anuj,

To my best knowledge, flink does not provide the encryption strategy
support for now. If you are using flink on k8s, it is possible to achieve
the encryption of parameters using the init container. You can check this SO

for
more detailed instructions.
Besides, it should be possible to override Configuration object in your job
code. Are you using Application mode to run the job?

Best regards,
Biao Geng

Anuj Jain  于2023年5月8日周一 13:55写道:

> Hi Community,
> I am trying to create an amazon S3 filesystem distributor using flink and
> for this I am using hadoop S3a connector with Flink filesystem sink.
> My flink application would run in a non-AWS environment, on native
> cluster; so I need to put my access keys in flink configuration.
>
> For connecting to S3 storage, i am configuring flink-conf.yaml with the
> access credentials like
> s3.access.key: 
> s3.secret.key: 
> ... and some other parameters required for assuming AWS IAM role with s3a
> AssumedRoleCredentialProvider
>
> Is there a way to encrypt these parameters rather than putting them
> directly or is there any other way to supply them programmatically.
>
> I tried to set them programmatically using the Configuration object and
> supplying them with
> StreamExecutionEnvironment.getExecutionEnvironment(Configuration), in my
> job (rather than from flink-conf.yaml) but then the S3 connection failed. I
> think flink creates the connection pool at startup even before the job is
> started.
>
> Thanks and Regards
> Anuj Jain
>


Re: Flink SQL Async UDF

2023-05-08 Thread Giannis Polyzos
What im curious is about the completable future in the function signature
(eval)
public final void eval(CompletableFuture> future,
Object... keys)

is is injected automatically?

Best

On Mon, May 8, 2023 at 10:02 AM Giannis Polyzos 
wrote:

> Hi Biao,
> yeah, this means that since it's a Table Function UDF, I need to combine
> it with lateral correct?
> I have tried both approaches to be honest. You can find attached the log
> output with verbose=true for both queries
>
> On Mon, May 8, 2023 at 9:49 AM Biao Geng  wrote:
>
>> Hi Giannis,
>> Would you mind executing SET 'sql-client.verbose' = 'true';. first in
>> your SQL client and then run the above example to show the error output in
>> the SQL client?
>> For the complete example, you can refer to codes of
>> HBaseRowDataAsyncLookupFunction.
>> Besides, in your example, you actually implement an async user-defiend
>> table function(see here
>> 
>>  for
>> more details about UDTF usage), not a scalar UDF. The usage could be
>> different.
>>
>> Best,
>> Biao Geng
>>
>> Giannis Polyzos  于2023年5月8日周一 14:27写道:
>>
>>> just to illustrate with a simple example
>>>
>>> On Mon, May 8, 2023 at 7:20 AM Giannis Polyzos 
>>> wrote:
>>>
 Yes I have and the function is present in the catalog.
 And I have other udfs also registered, I’m just having issues with the
 Async one and can’t really find any examples.

 Best

 On Mon, 8 May 2023 at 3:46 AM, Shammon FY  wrote:

> Hi Giannis,
>
> Have you use "CREATE FUNCTION asyncfn AS 'Your full class name of
> async function class'" or "CREATE TEMPORARY FUNCTION asyncfn AS 'Your full
> class name of async function class'" to create a customized function named
> "asyncfn" before it is used in your sql?
>
> The error message "No match found for function signature" usually
> indicates that the function does not exist or the parameters do not match.
>
> Best,
> Shammon FY
>
> On Sun, May 7, 2023 at 2:55 PM Giannis Polyzos 
> wrote:
>
>> I can't really find any examples / docs for Flink's
>> AsyncTableFunction and I have a hard time getting it to work.
>> Is there any example you can share that just takes as input a String
>> key and outputs lets say a record (returned by the lookup?)
>> Also is not clear to me how the async happens internally.
>> Is the future in the eval method signature used?
>>
>> I tried implementing eval methods like:
>> *public final void eval(CompletableFuture>
>> future, Object... keys)*
>>
>> *or *
>>
>>
>> *public void eval(CompletableFuture> result, String
>> rowkey)*
>>
>> but in both cases if I do something like
>> *SELECT asyncfn(accountId) from transactions;*
>> I get
>> *org.apache.calcite.sql.validate.SqlValidatorException: No match
>> found for function signature asyncfn()*
>>
>> Not sure what I am missing
>>
>> Thanks,
>> Giannis
>>
>


Re: Flink SQL Async UDF

2023-05-08 Thread Giannis Polyzos
Hi Biao,
yeah, this means that since it's a Table Function UDF, I need to combine it
with lateral correct?
I have tried both approaches to be honest. You can find attached the log
output with verbose=true for both queries

On Mon, May 8, 2023 at 9:49 AM Biao Geng  wrote:

> Hi Giannis,
> Would you mind executing SET 'sql-client.verbose' = 'true';. first in
> your SQL client and then run the above example to show the error output in
> the SQL client?
> For the complete example, you can refer to codes of
> HBaseRowDataAsyncLookupFunction.
> Besides, in your example, you actually implement an async user-defiend
> table function(see here
> 
>  for
> more details about UDTF usage), not a scalar UDF. The usage could be
> different.
>
> Best,
> Biao Geng
>
> Giannis Polyzos  于2023年5月8日周一 14:27写道:
>
>> just to illustrate with a simple example
>>
>> On Mon, May 8, 2023 at 7:20 AM Giannis Polyzos 
>> wrote:
>>
>>> Yes I have and the function is present in the catalog.
>>> And I have other udfs also registered, I’m just having issues with the
>>> Async one and can’t really find any examples.
>>>
>>> Best
>>>
>>> On Mon, 8 May 2023 at 3:46 AM, Shammon FY  wrote:
>>>
 Hi Giannis,

 Have you use "CREATE FUNCTION asyncfn AS 'Your full class name of async
 function class'" or "CREATE TEMPORARY FUNCTION asyncfn AS 'Your full class
 name of async function class'" to create a customized function named
 "asyncfn" before it is used in your sql?

 The error message "No match found for function signature" usually
 indicates that the function does not exist or the parameters do not match.

 Best,
 Shammon FY

 On Sun, May 7, 2023 at 2:55 PM Giannis Polyzos 
 wrote:

> I can't really find any examples / docs for Flink's AsyncTableFunction
> and I have a hard time getting it to work.
> Is there any example you can share that just takes as input a String
> key and outputs lets say a record (returned by the lookup?)
> Also is not clear to me how the async happens internally.
> Is the future in the eval method signature used?
>
> I tried implementing eval methods like:
> *public final void eval(CompletableFuture> future,
> Object... keys)*
>
> *or *
>
>
> *public void eval(CompletableFuture> result, String
> rowkey)*
>
> but in both cases if I do something like
> *SELECT asyncfn(accountId) from transactions;*
> I get
> *org.apache.calcite.sql.validate.SqlValidatorException: No match found
> for function signature asyncfn()*
>
> Not sure what I am missing
>
> Thanks,
> Giannis
>

Flink SQL> SELECT customasync(accountId) from transactions;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server 
error., )
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:187)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at 
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:191)
at 
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at 
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
at 
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
... 7 more
Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
column 8 to line 1, column 29: No match found for function signature 
customasync()
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
 Source)
at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
 Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:505)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:932)
at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:917)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5163)
at 

Re: Flink SQL Async UDF

2023-05-08 Thread Biao Geng
Hi Giannis,
Would you mind executing SET 'sql-client.verbose' = 'true';. first in your
SQL client and then run the above example to show the error output in the
SQL client?
For the complete example, you can refer to codes of
HBaseRowDataAsyncLookupFunction.
Besides, in your example, you actually implement an async user-defiend
table function(see here

for
more details about UDTF usage), not a scalar UDF. The usage could be
different.

Best,
Biao Geng

Giannis Polyzos  于2023年5月8日周一 14:27写道:

> just to illustrate with a simple example
>
> On Mon, May 8, 2023 at 7:20 AM Giannis Polyzos 
> wrote:
>
>> Yes I have and the function is present in the catalog.
>> And I have other udfs also registered, I’m just having issues with the
>> Async one and can’t really find any examples.
>>
>> Best
>>
>> On Mon, 8 May 2023 at 3:46 AM, Shammon FY  wrote:
>>
>>> Hi Giannis,
>>>
>>> Have you use "CREATE FUNCTION asyncfn AS 'Your full class name of async
>>> function class'" or "CREATE TEMPORARY FUNCTION asyncfn AS 'Your full class
>>> name of async function class'" to create a customized function named
>>> "asyncfn" before it is used in your sql?
>>>
>>> The error message "No match found for function signature" usually
>>> indicates that the function does not exist or the parameters do not match.
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Sun, May 7, 2023 at 2:55 PM Giannis Polyzos 
>>> wrote:
>>>
 I can't really find any examples / docs for Flink's AsyncTableFunction
 and I have a hard time getting it to work.
 Is there any example you can share that just takes as input a String
 key and outputs lets say a record (returned by the lookup?)
 Also is not clear to me how the async happens internally.
 Is the future in the eval method signature used?

 I tried implementing eval methods like:
 *public final void eval(CompletableFuture> future,
 Object... keys)*

 *or *


 *public void eval(CompletableFuture> result, String
 rowkey)*

 but in both cases if I do something like
 *SELECT asyncfn(accountId) from transactions;*
 I get
 *org.apache.calcite.sql.validate.SqlValidatorException: No match found
 for function signature asyncfn()*

 Not sure what I am missing

 Thanks,
 Giannis

>>>


Re: Flink Sql erroring at runtime

2023-05-08 Thread Hang Ruan
Hi, neha,

I think the error occurred because of the deserialization. Is there some
example data and runnable SQLs to reproduce the problem?

Best,
Hang

neha goyal  于2023年5月2日周二 16:33写道:

> Hello,
>
> I am using Flink 1.16.1 and observing a different behavior from Flink
> 1.13.6.
>
> SELECT if(some_string_field is null, 'default', 'some_string_field') from
> my_stream
>
> This SQL flink job in the streaming environment is erroring out during
> runtime with the exception mentioned below. There are no null values sent
> and it is failing for the nonnull values as well.
>
> It is running fine in Flink 1.13.6. Also, if I use the Integer field, it
> runs fine.
> Was there any change around this in Flink 14/15/16?
>
> io.IOException: Failed to deserialize consumer record due to
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:750)
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313)
> at
> org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
> ... 14 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
> at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.processElement(InputConversionOperator.java:128)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> ... 22 more
> Caused by: java.lang.NullPointerException
> at
> StreamExecCalc$53548.processElement_trueFilter10044_split10048(Unknown
> Source)
> at StreamExecCalc$53548.processElement_trueFilter10044(Unknown Source)
> at StreamExecCalc$53548.processElement_split10047(Unknown Source)
> at StreamExecCalc$53548.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
> ... 28 more
>

Re: Query on flink-parquet

2023-05-08 Thread Hang Ruan
Hi, Anuj,

Classes annotated with @Internal or @Experimental can be changed across any
two releases. Classes annotated with @PublicEvolving only can be changed
across minor releases (1.17.0 and 1.18.0).
So the classes you mentioned may be changed. If the API changed in a new
release, you have to modify your code to use the new release.

Best,
Hang

Anuj Jain  于2023年5月8日周一 13:28写道:

>
>> Hi Community,
>>
>>
>> I am trying to use flink-parquet for reading and writing parquet files
>> from the Flink filesystem connectors.
>>
>> In File source, I would be decoding parquet files and converting them to
>> avro records and similarly in file sink i would be encoding avro records to
>> parquet files.
>>
>>
>> For collection i am using
>>
>> BulkFormat bulkFormat =
>> new
>> StreamFormatAdapter<>(AvroParquetReaders.forSpecificRecord(recordClazz));
>> FileSource source = FileSource.forBulkFileFormat(bulkFormat,
>> path).build();
>>
>>
>> and for sinking i am using
>>
>> FileSink sink = FileSink.forBulkFormat(path,
>> AvroParquetWriters.forSpecificRecord(recordClazz)).build()
>>
>>
>> Query: The StreamFormatAdapter class is marked @Internal and, 
>> AvroParquetReaders
>> and AvroParquetWriters classes are marked @Experimental – does it mean
>> that in future flink releases these classes can be changed in a
>> non-backward compatible way like plugging of any other 3PP rather than
>> “parquet-avro” or changing the API structure thus impacting the application
>> code ?
>>
>> Would it be safe to use the code as specified above ?
>>
>>
>> Thanks and Regards
>>
>> Anuj
>>
>