Re: Flink File Source: File read strategy

2023-09-24 Thread Shammon FY
Hi Kirti,

I think you can refer to doc [1] and create a table in your S3 file system
(put your s3 path in the `path` field), then submit jobs to write and read
data with S3.

You can refer to [2] if your jobs are `DataStream`.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

Best,
Shammon FY

On Mon, Sep 25, 2023 at 12:36 PM Kirti Dhar Upadhyay K <
kirti.k.dhar.upadh...@ericsson.com> wrote:

> Thanks Shammon.
>
> Is there any way to verify that File Source reads files directly from S3?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>
> *From:* Shammon FY 
> *Sent:* 25 September 2023 06:27
> *To:* Kirti Dhar Upadhyay K 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink File Source: File read strategy
>
>
>
> Hi Kirti,
>
>
>
> I think the default file `Source` does not download files locally
> in Flink, but reads them directly from S3. However, Flink also supports
> configuring temporary directories through `io.tmp.dirs`. If it is a
> user-defined source, it can be obtained from FlinkS3FileSystem. After the
> Flink job is completed, the directory will be cleaned up.
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user <
> user@flink.apache.org> wrote:
>
> Hi Community,
>
>
>
> I am using Flink File Source with Amazon S3.
>
> Please help me on below questions-
>
>
>
>1. When Split Enumerator assigns split to Source Reader, does it
>downloads the file temporarily and then starts reading/decoding the records
>from file or it creates direct stream with S3?
>
>
>
>1. If it is downloaded locally then on which path? Is it configurable?
>
>
>
>1. Does this temporary file automatically gets deleted or any explicit
>cleanup is required?
>
>
>
>
>
> Regards,
>
> Kirti Dhar
>
>


Re: Flink File Source: File read strategy

2023-09-24 Thread Shammon FY
Hi Kirti,

I think the default file `Source` does not download files locally in Flink,
but reads them directly from S3. However, Flink also supports configuring
temporary directories through `io.tmp.dirs`. If it is a user-defined
source, it can be obtained from FlinkS3FileSystem. After the Flink job is
completed, the directory will be cleaned up.

Best,
Shammon FY

On Fri, Sep 22, 2023 at 3:11 PM Kirti Dhar Upadhyay K via user <
user@flink.apache.org> wrote:

> Hi Community,
>
>
>
> I am using Flink File Source with Amazon S3.
>
> Please help me on below questions-
>
>
>
>1. When Split Enumerator assigns split to Source Reader, does it
>downloads the file temporarily and then starts reading/decoding the records
>from file or it creates direct stream with S3?
>
>
>
>1. If it is downloaded locally then on which path? Is it configurable?
>
>
>
>1. Does this temporary file automatically gets deleted or any explicit
>cleanup is required?
>
>
>
>
>
> Regards,
>
> Kirti Dhar
>


Re: How to read flinkSQL job state

2023-09-06 Thread Shammon FY
Hi Yifan,

Besides reading job state, I would like to know what statebackend are you
using? Can you give the configurations about state and checkpoint for your
job? Maybe you can check these configuration items to confirm if they are
correct first.

Best,
Shammon FY

On Wed, Sep 6, 2023 at 3:17 PM Hang Ruan  wrote:

> Hi, Yifan.
>
> I think the document[1] means to let us convert the DataStream to the
> Table[2]. Then we could handle the state with the Table API & SQL.
>
> Best,
> Hang
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/libs/state_processor_api/
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
>
> Yifan He via user  于2023年9月6日周三 13:10写道:
>
>> Hi team,
>>
>> We are investigating why the checkpoint size of our FlinkSQL jobs keeps
>> growing and we want to look into the checkpoint file to know what is
>> causing the problem. I know we can use the state processor api to read the
>> state of jobs using datastream api, but how can I read the state of jobs
>> using table api & sql?
>>
>> Thanks,
>> Yifan
>>
>


Re: Job graph

2023-09-04 Thread Shammon FY
Hi Nikolaos,

As Ron said, the jobgraph is a low level structure in flink and it is not
exposed to users now. Currently you can get job details from
`RestClusterClient` in method `getJobDetails(JobID jobId)`, the result
`JobDetailsInfo` contains all vertices in the job and the json format job
plan.

Best,
Shammon FY

On Sat, Sep 2, 2023 at 6:26 AM David Anderson  wrote:

> This may or may not help, but you can get the execution plan from
> inside the client, by doing something like this (I printed the plan to
> stderr):
>
> ...
> System.err.println(env.getExecutionPlan());
> env.execute("my job");
>
> The result is a JSON-encoded representation of the job graph, which
> for the simple example I just tried it with, produced this output:
>
> {
>   "nodes" : [ {
> "id" : 1,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 10
>   }, {
> "id" : 3,
> "type" : "Sink: Writer",
> "pact" : "Operator",
> "contents" : "Sink: Writer",
> "parallelism" : 10,
> "predecessors" : [ {
>   "id" : 1,
>   "ship_strategy" : "FORWARD",
>   "side" : "second"
> } ]
>   }, {
> "id" : 5,
> "type" : "Sink: Committer",
> "pact" : "Operator",
> "contents" : "Sink: Committer",
> "parallelism" : 10,
> "predecessors" : [ {
>   "id" : 3,
>   "ship_strategy" : "FORWARD",
>   "side" : "second"
> } ]
>   } ]
> }
>
> On Wed, Aug 30, 2023 at 7:01 AM Nikolaos Paraskakis
>  wrote:
> >
> > Hello folks,
> >
> > I am trying to get the job graph of a running flink job. I want to use
> flink libraries. For now, I have the RestClusterClient and the job IDs.
> Tell me please how to get the job graph.
> >
> > Thank you.
>


Re: Could not retrieve JobResults of globally-terminated jobs from JobResultStore

2023-08-31 Thread Shammon FY
Hi,

是流式作业还是批式作业无法恢复吗?从错误上看作业已经处于结束状态,你可以查看一下有没有其他错误日志,看看为什么作业失败退出了

Best,
Shammon FY

On Thu, Aug 31, 2023 at 7:47 PM denghaibin  wrote:

> flink-1.16.0任务运行一段时间后,大批量任务失败。错误日志如下。麻烦大佬看下是什么问题
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults
> of globally-terminated jobs from JobResultStore
>  at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_382]
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_382]
>  at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_382]
> Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve
> JobResults of globally-terminated jobs from JobResultStore
>  at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:196)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_382]
>  ... 3 more
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>  at [Source: (org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream);
> line: 1, column: 0]
>  at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> ~[flink-dist-1.16.0.jar:1.16.0]
>  at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
> ~[flink-dist-1.16.0.jar:1.16.0]


Re: K8s operator - Stop Job with savepoint on session cluster via Java API

2023-08-31 Thread Shammon FY
Hi Krzysztof,

For the flink session cluster, you can stop the job with savepoint through
the statement `STOP JOB '{Your job id}' WITH SAVEPOINT;`. You can refer to
[1] for more information about how to do it in sql client and you can also
create a table environment to perform the statement in your application.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job

Best,
Shammon FY

On Fri, Sep 1, 2023 at 6:35 AM Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com> wrote:

> Hi community,
> I would like to ask what is the recommended way to stop Flink job with
> save point on a session cluster via k8s operator Java API?
>
> Currently I'm doing this by setting savepointTriggerNonce on JobSpec
> object.
> However I've noticed that this works only if I do not include Job state
> change in that spec.
>
> In other words when I submit JobSpec that has state change from Running to
> Suspend and savepointTriggerNonce, the checkpoint is not created. Is that
> intended?
> In order to mimic [1] do I have to submit two JobSpec updates? One with
> savepointNonce and the second one with Job state change?
>
> A followup question, what kind of savepoint is triggered when using
> savepointTriggerNonce native or canonical? Also is there a way to pass
> --drain option or savepoint path via spec? (Not
> including state.savepoints.dir cluster config option)
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint
>
> Thanks,
> Krzysztof Chmielewski
>


Re: flink sql语句转成底层处理函数

2023-08-27 Thread Shammon FY
如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/

On Sun, Aug 27, 2023 at 5:23 PM 海风 <18751805...@163.com> wrote:

> 请教下,是否可以去查询一个flink
> sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢?
>
>
>


Re: TaskManagers Crushing

2023-08-21 Thread Shammon FY
Hi,

I seems that the node `tef-prod-flink-04/10.11.0.51:37505 [
tef-prod-flink-04:38835-e3ca4d ]` exits unexpected, you can check whether
there are some errors in the log of TM or K8S

Best,
Shammon FY


On Sun, Aug 20, 2023 at 5:42 PM Kenan Kılıçtepe 
wrote:

> Hi,
>
> Nothing interesting on Kafka side.Just sone partition delete/create logs.
> Also I can't understand why all task managers stop at the same time
> without any error log.
>
> Thanks
> Kenan
>
>
>
> On Sun, Aug 20, 2023 at 10:49 AM liu ron  wrote:
>
>> Hi,
>>
>> Maybe you need to check what changed on the Kafka side at that time.
>>
>> Best,
>> Ron
>>
>> Kenan Kılıçtepe  于2023年8月20日周日 08:51写道:
>>
>>> Hi,
>>>
>>> I have 4 task manager working on 4 servers.
>>> They all crush at the same time without any useful error logs.
>>> Only log I can see is some disconnection from Kafka for both consumer
>>> and producers.
>>> Any idea or any help is appreciated.
>>>
>>> Some logs from all taskmanagers:
>>>
>>> I think first server 4 is crushing and it causes crush for all
>>> taskmanagers.
>>>
>>> JobManager:
>>>
>>> 2023-08-18 15:16:46,528 INFO  org.apache.kafka.clients.NetworkClient
>>>   [] - [AdminClient clientId=47539-enumerator-admin-client]
>>> Node 2 disconnected.
>>> 2023-08-18 15:19:00,303 INFO  org.apache.kafka.clients.NetworkClient
>>>   [] - [AdminClient
>>> clientId=tf_25464-enumerator-admin-client] Node 4 disconnected.
>>> 2023-08-18 15:19:16,668 INFO  org.apache.kafka.clients.NetworkClient
>>>   [] - [AdminClient
>>> clientId=cpu_59942-enumerator-admin-client] Node 1 disconnected.
>>> 2023-08-18 15:19:16,764 INFO  org.apache.kafka.clients.NetworkClient
>>>   [] - [AdminClient
>>> clientId=cpu_55128-enumerator-admin-client] Node 3 disconnected.
>>> 2023-08-18 15:19:27,913 WARN  akka.remote.transport.netty.NettyTransport
>>>   [] - Remote connection to [/10.11.0.51:42778] failed
>>> with java.io.IOException: Connection reset by peer
>>> 2023-08-18 15:19:27,963 WARN  akka.remote.ReliableDeliverySupervisor
>>>   [] - Association with remote system
>>> [akka.tcp://flink@tef-prod-flink-04:38835] has failed, address is now
>>> gated for [50] ms. Reason: [Disassociated]
>>> 2023-08-18 15:19:27,967 WARN  akka.remote.ReliableDeliverySupervisor
>>>   [] - Association with remote system
>>> [akka.tcp://flink-metrics@tef-prod-flink-04:46491] has failed, address
>>> is now gated for [50] ms. Reason: [Disassociated]
>>> 2023-08-18 15:19:29,225 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
>>> RouterReplacementAlgorithm -> kafkaSink_sinkFaultyRouter_windowMode: Writer
>>> -> kafkaSink_sinkFaultyRouter_windowMode: Committer (3/4)
>>> (f6fd65e3fc049bd9021093d8f532bbaf_a47f4a3b960228021159de8de51dbb1f_2_0)
>>> switched from RUNNING to FAILED on
>>> injection-assia-3-pro-cloud-tef-gcp-europe-west1:39011-b24b1d @
>>> injection-assia-3-pro-cloud-tef-gcp-europe-west1 (dataPort=35223).
>>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>>> Connection unexpectedly closed by remote task manager 'tef-prod-flink-04/
>>> 10.11.0.51:37505 [ tef-prod-flink-04:38835-e3ca4d ] '. This might
>>> indicate that the remote task manager was lost.
>>> at
>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
>>> ~[flink-dist-1.16.2.jar:1.16.2]
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>>> ~[flink-dist-1.16.2.jar:1.16.2]
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>>> ~[flink-dist-1.16.2.jar:1.16.2]
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>>> ~[flink-dist-1.16.2.jar:1.16.2]
>>> at
>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>>> ~[flink-dist-1.16.2.jar:1.16.2]
>>>

Re: Flink throws exception when submitting a job through Jenkins and Spinnaker

2023-08-18 Thread Shammon FY
Hi Elakiya,

In general I think there would be two steps to start a job: launch jm node
including dispatcher, resource manager, and then submit sql job to
dispatcher. The dispatcher will launch a rest server, and the client will
connect to the rest server to submit a job.

>From your error message, I found the timeout exception is thrown from
`RestClusterClient` which is used to submit jobs to the rest server in
`Dispatcher`. So I suspect that the address or port for Dispatcher is
incorrect, causing the rest connection to time out. You can check the
configuration for the rest server or check whether the dispatcher is
started successfully.

Best,
Shammon FY

On Wed, Aug 16, 2023 at 2:55 PM elakiya udhayanan 
wrote:

> Hi Shammon,
>
> Thanks for your response.
>
> If it is a network issue as you have mentioned, how does it read the
> contents of the jar file, we can see that the code is read and it throws an
> error only when executing the SQL. Also can you let us know exactly what
> address could be wrong here, so that we could correct from our end.
> My other doubt is whether we should port-forward the job manager (is it
> necessary when using Kubernetes standalone) before submitting the job using
> the run command.
>
> Thanks,
> Elakiya
>
> On Mon, Aug 14, 2023 at 11:15 AM Shammon FY  wrote:
>
>> Hi,
>>
>> It seems that the client can not access the right network to submit you
>> job, maybe the address option in k8s is wrong and you can check the error
>> message in k8s log
>>
>> Best,
>> Shammon FY
>>
>> On Fri, Aug 11, 2023 at 11:40 PM elakiya udhayanan 
>> wrote:
>>
>>>
>>> Hi Team,
>>> We are using Apache Flink 1.16.1 configured as a standalone Kubernetes
>>> pod ,for one of our applications to read from confluent Kafka topics to do
>>> event correlation. We are using the flink's Table API join for the same (in
>>> SQL format).We are able to submit the job using the flink's UI. For our DEV
>>> environment , we implemented a jenkins pipeline, which downloads the jar
>>> that is required to submit the job and also creates the flink kubernetes
>>> pods and copy the downloaded jar to the flink pod's folder and uses the
>>> flink's run command to submit the job.The deployment step happens through
>>> the spinnaker webhook. We use a docker file to create the kubernetes pods,
>>> also have a docker-entrypoin.sh which has the flink run command to submit
>>> the job.
>>>
>>> Everything works fine, but when the job is getting submitted , we get
>>> the below exception.
>>>
>>> The flink run command used is
>>>
>>> *flink run  /opt/flink/lib/application-0.0.1.jar*
>>> Any help is appreciated.
>>>
>>> 
>>>  The program finished with the following exception:
>>>
>>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>>> caused an error: Failed to execute sql
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>>> at 
>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:843)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1087)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1165)
>>> at 
>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1165)
>>> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
>>> at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:867)
>>> at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827)
>>> at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
>>> at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
>>

Re: 404 Jar File Not Found w/ Web Submit Disabled

2023-08-14 Thread Shammon FY
Hi,

Currently you can upload a jar job to a flink session cluster, or submit a
job graph to the session cluster with rest api, for example, submit sql
jobs with jdbc driver to sql-gateway, then the gateway will build job graph
and submit it to the session cluster via rest endpoint.

If you configure `web.submit.enable=false` in your cluster, you could not
upload a jar job, but you can still submit jobs via rest endpoint. You can
create your `RestClusterClient` to do that or using the existing
jdbc-driver and sql-gateway.


Best,
Shammon FY

On Tue, Aug 15, 2023 at 12:14 AM patricia lee  wrote:

> Hi,
>
> Just to add, when I set back to "true" the web.ui submit property, that is
> when the rest endpoint /jars/upload worked again. But in the documentation
> reference:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/
>
>
> Disabling the UI doesnt disable the endpoint. Is this the expected
> behavior?
>
> Regards,
> Patricia
>
> On Mon, Aug 14, 2023, 5:07 PM patricia lee  wrote:
>
>> Hi,
>>
>> I disabled the web.ui.submit=false, after that uploading jar files via
>> rest endpoint is now throwing 404. In the documentation it says:
>>
>> "Even it is disabled sessions clusters still accept jobs through REST
>> requests (Http calls). This flag only guards the feature to upload jobs in
>> the UI"
>>
>> I also set the io.tmp.dirs to my specified directory.
>>
>>
>> But I can no longer upload jar via rest endpoint.
>>
>>
>> Regards,
>> Patricia
>>
>


Re: Flink throws exception when submitting a job through Jenkins and Spinnaker

2023-08-13 Thread Shammon FY
Hi,

It seems that the client can not access the right network to submit you
job, maybe the address option in k8s is wrong and you can check the error
message in k8s log

Best,
Shammon FY

On Fri, Aug 11, 2023 at 11:40 PM elakiya udhayanan 
wrote:

>
> Hi Team,
> We are using Apache Flink 1.16.1 configured as a standalone Kubernetes pod
> ,for one of our applications to read from confluent Kafka topics to do
> event correlation. We are using the flink's Table API join for the same (in
> SQL format).We are able to submit the job using the flink's UI. For our DEV
> environment , we implemented a jenkins pipeline, which downloads the jar
> that is required to submit the job and also creates the flink kubernetes
> pods and copy the downloaded jar to the flink pod's folder and uses the
> flink's run command to submit the job.The deployment step happens through
> the spinnaker webhook. We use a docker file to create the kubernetes pods,
> also have a docker-entrypoin.sh which has the flink run command to submit
> the job.
>
> Everything works fine, but when the job is getting submitted , we get the
> below exception.
>
> The flink run command used is
>
> *flink run  /opt/flink/lib/application-0.0.1.jar*
> Any help is appreciated.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute sql
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:843)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1087)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1165)
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1165)
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:867)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
> at com.sample.SampleStreamingApp.main(SampleStreamingApp.java:157)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ... 8 more
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
> 'insert-into_default_catalog.default_database.Sample'.
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
> at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:206)
> at 
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:850)
> ... 17 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
> 15:06:44.964 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error 
> while running the command.
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: Failed to execute sql
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>  ~[application-0.0.1.jar:?]
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>  ~[a

Re: Flink operator job restart

2023-08-10 Thread Shammon FY
Hi Ethan:

You can restart jobs with a specified checkpoint directory as [1]. But
generally, we often restart jobs with savepoint, you can refer to [2] for
more information.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#resuming-from-a-retained-checkpoint
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/

Best,
Shammon FY



On Thu, Aug 10, 2023 at 2:43 PM liu ron  wrote:

> Hi, lvan
>
> You can refer to the five-part that restore the job in [1].
>
> [1]
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/upgrade/#2-upgrading-with-existing-flinkdeployments
>
> Best,
> Ron
>
> Ethan T Yang  于2023年8月10日周四 12:28写道:
>
>> Hi Flink users,
>>
>> When using Flink operator, how to restart jobs from a checkpoint. I am
>> used to the Flink Kubernetes native deployment, where I can run
>>
>> flink run -s
>>
>> How to achieve that in Flink operator
>>
>> Thanks
>> Ivan
>
>


Re: Flink消费MySQL

2023-08-07 Thread Shammon FY
像上面提到的,目前可能直接使用CDC是一个比较好的方案,自己读数据会有很多问题,比如update数据如何读取、如何读取增量数据、如何处理failover等,还是直接使用CDC最方便

Best,
Shammon FY

On Tue, Aug 8, 2023 at 11:30 AM Jiabao Sun 
wrote:

> Hi,
>
> 可以尝试使用 flink-cdc-connectors 去实时关联。
> 使用 regular join 需要保留两张表完整的状态,表数据量较大建议使用 rocksdb backend。
> 被关联的表变化不大的话可以考虑 lookup join。
>
> Best,
> Jiabao
>
>
> > 2023年8月8日 上午11:10,小昌同学  写道:
> >
> > 谢谢老师指导呀;
> >
> 我目前的需求是想把两张MySQL的表数据读取出来,然后进行实时关联,我现在能想到的就是要么使用cdc实时读取,要么就是写一个循环去读MySQL中的数据
> > 老师这一块有更好的建议嘛
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
> >  回复的原邮件 
> > | 发件人 | Shammon FY |
> > | 发送日期 | 2023年8月8日 10:37 |
> > | 收件人 |  |
> > | 主题 | Re: Flink消费MySQL |
> > Hi,
> >
> > 你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏
> >
> > 至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
> > source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况
> >
> > Best,
> > Shammon FY
> >
> > On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:
> >
> > 各位老师好
> >
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> > 以下是我的代码:
> > |
> > public class MysqlSource2 extends RichSourceFunction {
> > PreparedStatement ps;
> > private Connection connection;
> >
> > @Override
> > public void open(Configuration parameters) throws Exception {
> > super.open(parameters);
> > connection = getConnection();
> > String sql="select * from actiontype;";
> > ps = connection.prepareStatement(sql);
> > }
> >
> > private static Connection getConnection(){
> > Connection con=null;
> > String driverClass= FlinkConfig.config.getProperty("driverClass");
> > String url=FlinkConfig.config.getProperty("jdbcUrl");
> > String user=FlinkConfig.config.getProperty("jdbcUser");
> > String passWord=FlinkConfig.config.getProperty("passWord");
> >
> > try {
> > Class.forName(driverClass);
> > con= DriverManager.getConnection(url,user,passWord);
> > } catch (Exception e) {
> > throw new RuntimeException(e);
> > }
> > return con;
> > }
> >
> > @Override
> > public void run(SourceContext ctx) throws Exception {
> > ResultSet resultSet = ps.executeQuery();
> > while (resultSet.next()){
> > ActionType actionType = new ActionType(
> > resultSet.getString("action"),
> > resultSet.getString("action_name")
> > );
> > ctx.collect(actionType);
> > }
> > }
> >
> > @Override
> > public void close() throws Exception {
> > super.close();
> > if (null!=connection){
> > connection.close();
> > }
> > if (null!=ps){
> > ps.close();
> > }
> > }
> >
> > @Override
> > public void cancel() {
> > }
> > };
> >
> >
> > |
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
>
>


Re: Flink消费MySQL

2023-08-07 Thread Shammon FY
Hi,

你代码里的ResultSet读取完成后需要将resultSet关闭掉,避免资源泄漏

至于你提到的Mysql数据读完程序就结束具体是指哪块?mysql是bounded
source,数据消费完成并且整个作业计算完成后,就会结束,这是正常情况

Best,
Shammon FY

On Mon, Aug 7, 2023 at 5:04 PM 小昌同学  wrote:

> 各位老师好
> ,我这边在本地使用通过继承RichSourceFunction类,实现从MySQL中读取数据,但是为啥程序将MySQL中的全部数据获取出来后,程序就自动停止了啊;
> 以下是我的代码:
> |
> public class MysqlSource2 extends RichSourceFunction {
> PreparedStatement ps;
> private Connection connection;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> connection = getConnection();
> String sql="select * from actiontype;";
> ps = connection.prepareStatement(sql);
> }
>
> private static Connection getConnection(){
> Connection con=null;
> String driverClass= FlinkConfig.config.getProperty("driverClass");
> String url=FlinkConfig.config.getProperty("jdbcUrl");
> String user=FlinkConfig.config.getProperty("jdbcUser");
> String passWord=FlinkConfig.config.getProperty("passWord");
>
> try {
> Class.forName(driverClass);
> con= DriverManager.getConnection(url,user,passWord);
> } catch (Exception e) {
> throw new RuntimeException(e);
> }
> return con;
> }
>
> @Override
> public void run(SourceContext ctx) throws Exception {
> ResultSet resultSet = ps.executeQuery();
> while (resultSet.next()){
> ActionType actionType = new ActionType(
> resultSet.getString("action"),
> resultSet.getString("action_name")
> );
> ctx.collect(actionType);
> }
> }
>
> @Override
> public void close() throws Exception {
> super.close();
> if (null!=connection){
> connection.close();
> }
> if (null!=ps){
> ps.close();
> }
> }
>
> @Override
> public void cancel() {
> }
> };
>
>
> |
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: flink1.14.5 sql-client无法查询hbase1.4.3数据

2023-08-07 Thread Shammon FY
Hi,

看着像是版本冲突了,你有在你的flink session集群目录里放hbase的包吗?可以检查一下跟flink hbase
shaded的hbase版本是否一致

Best,
Shammon FY

On Sat, Aug 5, 2023 at 9:33 PM 杨东树  wrote:

> 各位好,
>目前使用sql-client查询hbase数据时,无法查询成功,麻烦指导下,谢谢。
>复现方法:
> 1、hbase操作:
> hbase(main):005:0> create 'flink_to_hbase','cf1'
> 0 row(s) in 2.2900 seconds
> hbase(main):006:0> put 'flink_to_hbase', 'rk0001', 'cf1:username',
> 'zhangsan'
> 0 row(s) in 0.0510 seconds
>
>
> 2、flink操作:
> ./start-cluster.sh
> ./sql-client.sh
> CREATE TABLE flink_to_hbase(
> rowkey STRING,
> cf1 ROW,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )WITH(
> 'connector'='hbase-1.4',
> 'table-name'='flink_to_hbase',
> 'zookeeper.quorum'='192.168.21.128:2181',
> 'zookeeper.znode.parent'='/hbase'
> );
>
>
> 3、flink 报错日志:
> 2023-08-05 21:00:35,081 INFO  org.apache.flink.table.client.cli.CliClient
> [] - Command history file path: /root/.flink-sql-history
> 2023-08-05 21:00:52,011 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:00:52,026 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Submitting job 'collect' (0c147bc0da5a43a5a382f2ec20740b45).
> 2023-08-05 21:00:52,480 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Successfully submitted job 'collect' (0c147bc0da5a43a5a382f2ec20740b45) to '
> http://localhost:8081'.
> 2023-08-05 21:00:55,809 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:00:55,830 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:07:52,481 INFO
> org.apache.flink.configuration.Configuration [] - Config
> uses fallback configuration key 'jobmanager.rpc.address' instead of key
> 'rest.address'
> 2023-08-05 21:07:52,484 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Submitting job 'collect' (d29904103fa3c83e3089c09f093372c9).
> 2023-08-05 21:07:52,728 INFO
> org.apache.flink.client.program.rest.RestClusterClient   [] -
> Successfully submitted job 'collect' (d29904103fa3c83e3089c09f093372c9) to '
> http://localhost:8081'.
> 2023-08-05 21:07:55,972 WARN  org.apache.flink.table.client.cli.CliClient
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:211)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:231)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:532)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:423)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_231]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1274)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]

Re: 作业full gc 很严重

2023-08-03 Thread Shammon FY
Hi,

一般需要确认一下是哪块引起的fullgc,比如metaspace还是堆内存过大导致的。如果是堆内存过大导致的,可以将内存dump下来,用一些分析工具例如mat、visualvm等具体查看一下哪些对象占比比较多,是否存在内存泄漏等原因

Best,
Shammon FY

On Fri, Aug 4, 2023 at 10:00 AM yidan zhao  wrote:

> GC日志看GC原因
>
> 2278179732 <2278179...@qq.com.invalid> 于2023年8月3日周四 13:59写道:
> >
> > 大家好,请问下作业跑一段时间就会偶发出现背压,full gc看着很严重,有什么好的工具排查下吗?或者经验文档?谢谢!
>


Re: Flink netty connector for TCP source

2023-08-02 Thread Shammon FY
Yes, you are right !

On Thu, Aug 3, 2023 at 1:35 PM Kamal Mittal 
wrote:

> Hello Shammon,
>
>
>
> As it is said one split enumerator for one source means multiple sub-tasks
> of that source (if parallelism >1) will use same split enumerator instance
> right?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 03 August 2023 10:54 AM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi Kamal,
>
>
>
> I think it depends on your practical application. In general, the built-in
> source in Flink such as kafka or hive will proactively fetch splits from
> source, instead of starting a service with a source service pushed over.
>
>
>
> Returning to the issue of port conflicts, you may need to check if the
> specified port in the split enumerator is being used by other services, or
> if random ports can be used. Generally speaking, there will only be one
> split enumerator for one source, but when the job master fails to restart,
> the split enumerator of the previous job may not be completely shutdown. If
> they use the same port, it may also cause conflicts.
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Thu, Aug 3, 2023 at 11:57 AM Kamal Mittal 
> wrote:
>
> Hello Shammon,
>
>
>
> Please have a look for below and share views.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal via user 
> *Sent:* 02 August 2023 08:02 AM
> *To:* Shammon FY ; user@flink.apache.org
> *Subject:* RE: Flink netty connector for TCP source
>
>
>
> Thanks Shammon.
>
>
>
> Purpose of opening server socket in Split Enumerator was that it has only
> one instance per source and so the server socket too (port binding can
> happen only once). And then accepted Socket connections
> (serversocket.accept()) will act as splits which will be further processed
> by readers.
>
>
>
> Let me know please if there is issue you see in above understanding or
> some other way you can suggest. Issue is that server socket can only bind a
> port once over a machine node and that’s why we thought to open it in split
> enumerator which is called only once and per source.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 02 August 2023 07:48 AM
> *To:* Kamal Mittal ; user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi Kamal,
>
>
>
> It confuses me a little that what's the purpose of opening a server socket
> in SplitEnumerator? Currently there will be only one SplitEnumerator
> instance in JobManager for each one source not each source subtask. If
> there's only one source in your job, no matter how much parallelism this
> source has, there will be only one SplitEnumerator instance in JM which can
> connect and send events to the source subtasks.
>
>
>
> Additionally, if you want to read data from a netty source, you can
> implement your NettySplitEnumerator to get splits from netty source and
> assign them to a NettySourceReader which will read data from netty source
> according to the splits.
>
>
>
> Best,
>
> Shammon FY
>
>
>
>
>
> On Tue, Aug 1, 2023 at 12:02 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> I created a custom server socket source in which opened server socket in
> split enumerator, source parallelism is =2 and it gives error while running
> - “Address is already in use” i.e. it tried to open two server sockets as
> per parallelism which is not correct as parallelism is applicable for
> source operator and not for split enumerator?
>
>
>
> Please correct me if above understanding is not correct.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Hang Ruan 
> *Sent:* 01 August 2023 08:55 AM
> *To:* Kamal Mittal 
> *Cc:* liu ron ; user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal.
>
>
>
> The SplitEnumerator is contained in the SourceCoordinator. They are only
> used in JM.
>
>
>
> Best,
>
> Hang
>
>
>
> Kamal Mittal via user  于2023年8月1日周二 10:43写道:
>
> Thanks.
>
>
>
> I looked at the link for custom data sources, one query here that how to
> make sure for Split enumerator to execute on Job Manager rather than at
> Task manager?
>
>
>
> *From:* liu ron 
> *Sent:* 31 July 2023 10:06 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal
>
> Currently, we don't provide the connector like TCP  source in Flink main

Re: File Source Exactly Once Delivery Semantics

2023-08-02 Thread Shammon FY
Hi Kirti,

Simply speaking, sink needs to support `two-stage commit`, the sink can
`write` data as normal and only `commit` data after the checkpoint is
successful. This ensures that even if a failover occurs and data needs to
be replayed, the previously written data is not visible to the
user. However, this approach will increase data latency. The data is only
visible after the checkpoint is completed and the data is committed, rather
than immediately visible after the sink writes the data.

Best,
Shammon FY

On Thu, Aug 3, 2023 at 12:23 PM Kirti Dhar Upadhyay K via user <
user@flink.apache.org> wrote:

> Hi Team,
>
>
>
> I am using Flink File Source in one of my use case.
>
> I observed that, while reading file by source reader it stores its
> position in checkpointed data.
>
> In case application crashes, it restores its position from checkpointed
> data, once application comes up, which may result in re-emitting few
> records which were emitted in between last checkpointing and application
> crash.
>
> Whereas in doc link
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/guarantees/
> I found that File source ensures exactly once delivery semantics with help
> of data sink.
>
> *“**To guarantee end-to-end exactly-once record delivery (in addition to
> exactly-once state semantics), the data sink needs to take part in the
> checkpointing mechanism.”*
>
>
>
>
>
> Can someone put some light on this?
>
>
>
> Regards,
>
> Kirti Dhar
>
>
>


Re: Flink netty connector for TCP source

2023-08-02 Thread Shammon FY
Hi Kamal,

I think it depends on your practical application. In general, the built-in
source in Flink such as kafka or hive will proactively fetch splits from
source, instead of starting a service with a source service pushed over.

Returning to the issue of port conflicts, you may need to check if the
specified port in the split enumerator is being used by other services, or
if random ports can be used. Generally speaking, there will only be one
split enumerator for one source, but when the job master fails to restart,
the split enumerator of the previous job may not be completely shutdown. If
they use the same port, it may also cause conflicts.

Best,
Shammon FY

On Thu, Aug 3, 2023 at 11:57 AM Kamal Mittal 
wrote:

> Hello Shammon,
>
>
>
> Please have a look for below and share views.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal via user 
> *Sent:* 02 August 2023 08:02 AM
> *To:* Shammon FY ; user@flink.apache.org
> *Subject:* RE: Flink netty connector for TCP source
>
>
>
> Thanks Shammon.
>
>
>
> Purpose of opening server socket in Split Enumerator was that it has only
> one instance per source and so the server socket too (port binding can
> happen only once). And then accepted Socket connections
> (serversocket.accept()) will act as splits which will be further processed
> by readers.
>
>
>
> Let me know please if there is issue you see in above understanding or
> some other way you can suggest. Issue is that server socket can only bind a
> port once over a machine node and that’s why we thought to open it in split
> enumerator which is called only once and per source.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 02 August 2023 07:48 AM
> *To:* Kamal Mittal ; user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi Kamal,
>
>
>
> It confuses me a little that what's the purpose of opening a server socket
> in SplitEnumerator? Currently there will be only one SplitEnumerator
> instance in JobManager for each one source not each source subtask. If
> there's only one source in your job, no matter how much parallelism this
> source has, there will be only one SplitEnumerator instance in JM which can
> connect and send events to the source subtasks.
>
>
>
> Additionally, if you want to read data from a netty source, you can
> implement your NettySplitEnumerator to get splits from netty source and
> assign them to a NettySourceReader which will read data from netty source
> according to the splits.
>
>
>
> Best,
>
> Shammon FY
>
>
>
>
>
> On Tue, Aug 1, 2023 at 12:02 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> I created a custom server socket source in which opened server socket in
> split enumerator, source parallelism is =2 and it gives error while running
> - “Address is already in use” i.e. it tried to open two server sockets as
> per parallelism which is not correct as parallelism is applicable for
> source operator and not for split enumerator?
>
>
>
> Please correct me if above understanding is not correct.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Hang Ruan 
> *Sent:* 01 August 2023 08:55 AM
> *To:* Kamal Mittal 
> *Cc:* liu ron ; user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal.
>
>
>
> The SplitEnumerator is contained in the SourceCoordinator. They are only
> used in JM.
>
>
>
> Best,
>
> Hang
>
>
>
> Kamal Mittal via user  于2023年8月1日周二 10:43写道:
>
> Thanks.
>
>
>
> I looked at the link for custom data sources, one query here that how to
> make sure for Split enumerator to execute on Job Manager rather than at
> Task manager?
>
>
>
> *From:* liu ron 
> *Sent:* 31 July 2023 10:06 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal
>
> Currently, we don't provide the connector like TCP  source in Flink main
> repo. If you need this connector, you can try to implement it refer to the
> FLIP-27 source docs
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
>
>
>
> Best,
>
> Ron
>
>
>
> Shammon FY  于2023年7月27日周四 11:23写道:
>
> I cannot find any information about netty source in flink website and it
> is not in the connector list[1], so I'm think that it is not supported by
> flink community
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/
>
>
>
> Best,
>
> Shammon FY
>
>
&g

Re: Apache Atlas - Flink Integration

2023-08-01 Thread Shammon FY
Hi arjun,

As @Mate mentioned, the discussion of FLIP-314 has been completed and a
vote will be initiated soon. We would like to introduce the interfaces for
lineage in the next release of Flink after 1.18

Best,
Shammon FY


On Tue, Aug 1, 2023 at 11:07 PM Mate Czagany  wrote:

> Hi,
>
> Unfortunately the Atlas hook you've read about is only available in the
> Cloudera Flink solution and has not been made open-source.
>
> In the future FLIP-314[1] might offer a simple solution to implement the
> Atlas integration.
>
> Best Regards,
> Mate
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-314%3A+Support+Customized+Job+Lineage+Listener
>
>
> arjun s  ezt írta (időpont: 2023. aug. 1., K,
> 16:23):
>
>> I am looking to integrate Apache Atlas with Apache Flink to capture Job
>> lineage. I found some references around it from Cloudera (CDP) and they are
>> using Atlas-flink hook , but I am not able to find any documentation or
>> implementation.
>>
>> I had gone through the JIRA link as mentioned below.But in this link the
>> feature is still open.
>>
>> https://issues.apache.org/jira/browse/FLINK-6757
>>
>> I would like to know whether this feature on Apache Atlas with Apache
>> Flink is released or not. If yes, anyone could share with me the references
>> for integrating.
>>
>> Thanks and Regards,
>> Arjun S
>>
>


Re: No. of task managers vs No. of task slots

2023-08-01 Thread Shammon FY
Hi Kamal,

For the three points
> 3. What is the difference between high no. of task managers vs high no.
of task slots (with low no. of task managers)?

I think this is mainly limited by the JVM's efficiency in managing memory.
When we use Flink Session cluster as olap engine, we found that when the
memory of a single TM exceeds specified threshold such as 50g or 100g,
performance may fall back due to GC and other issues. We currently do not
have time to pinpoint the specific reason. So from this perspective, I
think memory limits the number of computing tasks per TM and the number of
tasks per TM also limits the number of slots.

Best,
Shammon FY

On Tue, Aug 1, 2023 at 4:22 PM liu ron  wrote:

> Hi, Kamal
>
> > How many task managers a job manager can handle? Is there any upper
> limit also?
>
> There is no clear limit to how many TMs a JM can cover, and based on my
> past experience, it can handle TMs over 1000+, even more.
>
> > How to decide no. of task managers, is there any way?
>
> I don't think there is a specific formula here, but rather the number of
> TMs based on your actual business scenario.
>
> > What is the difference between high no. of task managers vs high no. of
> task slots (with low no. of task managers)?
>
> Flink uses the SlotSharigGroup mechanism, by default all the operators of
> the pipeline will run on a Slot, this mechanism will lead to all the
> operators will share a share of memory with each other, there will be
> competition, all the Slots of a TM also share the memory of the TM. If
> there are too many Slots on a TM, they may interfere with each other and
> affect the stability of the job. If your job requires high stability, it
> may make more sense to take a high no. of TM way, with fewer Slots per TM.
> However, too many TMs may lead to too much network transmission overhead,
> so if the latency requirement of the job is higher, it is more appropriate
> to adopt the approach of high no. of slot way.
>
> Best,
> Ron
>
>
> Kamal Mittal via user  于2023年8月1日周二 14:21写道:
>
>> Hell Community,
>>
>>
>>
>> Need info. for below –
>>
>>
>>
>>1. How many task managers a job manager can handle? Is there any
>>upper limit also?
>>
>>
>>
>>1. How to decide no. of task managers, is there any way?
>>
>>
>>
>>1. What is the difference between high no. of task managers vs high
>>no. of task slots (with low no. of task managers)?
>>
>>
>>
>> Rgds,
>>
>> Kamal
>>
>


Re: Flink netty connector for TCP source

2023-08-01 Thread Shammon FY
Hi Kamal,

It confuses me a little that what's the purpose of opening a server socket
in SplitEnumerator? Currently there will be only one SplitEnumerator
instance in JobManager for each one source not each source subtask. If
there's only one source in your job, no matter how much parallelism this
source has, there will be only one SplitEnumerator instance in JM which can
connect and send events to the source subtasks.

Additionally, if you want to read data from a netty source, you can
implement your NettySplitEnumerator to get splits from netty source and
assign them to a NettySourceReader which will read data from netty source
according to the splits.

Best,
Shammon FY


On Tue, Aug 1, 2023 at 12:02 PM Kamal Mittal via user 
wrote:

> Hello,
>
>
>
> I created a custom server socket source in which opened server socket in
> split enumerator, source parallelism is =2 and it gives error while running
> - “Address is already in use” i.e. it tried to open two server sockets as
> per parallelism which is not correct as parallelism is applicable for
> source operator and not for split enumerator?
>
>
>
> Please correct me if above understanding is not correct.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Hang Ruan 
> *Sent:* 01 August 2023 08:55 AM
> *To:* Kamal Mittal 
> *Cc:* liu ron ; user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal.
>
>
>
> The SplitEnumerator is contained in the SourceCoordinator. They are only
> used in JM.
>
>
>
> Best,
>
> Hang
>
>
>
> Kamal Mittal via user  于2023年8月1日周二 10:43写道:
>
> Thanks.
>
>
>
> I looked at the link for custom data sources, one query here that how to
> make sure for Split enumerator to execute on Job Manager rather than at
> Task manager?
>
>
>
> *From:* liu ron 
> *Sent:* 31 July 2023 10:06 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi, Kamal
>
> Currently, we don't provide the connector like TCP  source in Flink main
> repo. If you need this connector, you can try to implement it refer to the
> FLIP-27 source docs
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/sources/
>
>
>
> Best,
>
> Ron
>
>
>
> Shammon FY  于2023年7月27日周四 11:23写道:
>
> I cannot find any information about netty source in flink website and it
> is not in the connector list[1], so I'm think that it is not supported by
> flink community
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
> wrote:
>
> Hello Shammon,
>
>
>
> Yes socket text stream I am aware of but was thinking if something like as
> ‘https://github.com/apache/bahir-flink/tree/master/flink-connector-netty
> <https://protect2.fireeye.com/v1/url?k=31323334-501d5122-313273af-45444731-ffc853ff875d9ed3=1=30c3fa9e-7c08-4486-8017-0d78c3c714ac=https%3A%2F%2Fgithub.com%2Fapache%2Fbahir-flink%2Ftree%2Fmaster%2Fflink-connector-netty>’
> is also supported by Flink?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 27 July 2023 08:15 AM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi Kamal,
>
>
>
> There's socket text stream in `DataStream` and you can refer to [1] for
> more details.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Does flink provides netty connector for custom TCP source?
>
>
>
> Any documentation details please share?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: flink-job-history 任务太多页面卡死

2023-07-27 Thread Shammon FY
Hi,

可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options

Best,
Shammon FY

On Fri, Jul 28, 2023 at 10:17 AM 阿华田  wrote:

> 目前flink-job-history
> 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: Flink netty connector for TCP source

2023-07-26 Thread Shammon FY
I cannot find any information about netty source in flink website and it is
not in the connector list[1], so I'm think that it is not supported by
flink community

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/

Best,
Shammon FY

On Thu, Jul 27, 2023 at 10:53 AM Kamal Mittal 
wrote:

> Hello Shammon,
>
>
>
> Yes socket text stream I am aware of but was thinking if something like as
> ‘https://github.com/apache/bahir-flink/tree/master/flink-connector-netty’
> is also supported by Flink?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 27 July 2023 08:15 AM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Flink netty connector for TCP source
>
>
>
> Hi Kamal,
>
>
>
> There's socket text stream in `DataStream` and you can refer to [1] for
> more details.
>
>
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program
>
>
>
> Best,
>
> Shammon FY
>
>
>
> On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Does flink provides netty connector for custom TCP source?
>
>
>
> Any documentation details please share?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: Flink netty connector for TCP source

2023-07-26 Thread Shammon FY
Hi Kamal,

There's socket text stream in `DataStream` and you can refer to [1] for
more details.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/#example-program

Best,
Shammon FY

On Wed, Jul 26, 2023 at 4:26 PM Kamal Mittal via user 
wrote:

> Hello,
>
>
>
> Does flink provides netty connector for custom TCP source?
>
>
>
> Any documentation details please share?
>
>
>
> Rgds,
>
> Kamal
>


Re: JdbcSink引发的IO过高

2023-07-25 Thread Shammon FY
Hi,

目前JdbcSink会为每个Sink创建PreparedStatement,当进行batch数据处理时,会先调用PreparedStatement的addBatch()函数将数据放入缓存,到达flush条件后调用executeBatch()函数批量发送数据到jdbc
server,这样会节省网络IO。

具体到数据库侧,我理解执行 `insert  into error_md5_info (action, serverIp,
handleSerialno,md5Num,insertTime,dateTime) values
(1,2,3,4,5,6),(1,2,3,4,9,10);`或者分成两条数据插入,对底层存储可能差别不大,因为插入的数据量不会减少,具体你可以观察一下

Best,
Shammon FY


On Tue, Jul 25, 2023 at 4:02 PM 小昌同学  wrote:

> 各位老师好,我这边在使用Flink 的JdbcSink的时候,有一个疑问想请教一下各位老师:
>
> 我的代码如下:我代码中设定的每一个批次插入1000条,或者是每隔200ms插入一次数据,但是由于我司musql资源不行,从监控页面看插入的IO过高,我想请教一下我使用这样的insert语句,当我积累了1000条,是怎么样的格式
> 是
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values
> (1,2,3,4,5,6),(1,2,3,4,9,10);
> 或者是
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,5,6)
> insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (1,2,3,4,9,10)
> 如果是第二种情况,有什么样的方式可以转换为第一种情况,这样的话会大大减少IO
>
>
>
> |
> errorStream.addSink(JdbcSink.sink(
> "insert  into error_md5_info (action, serverIp,
> handleSerialno,md5Num,insertTime,dateTime) values (?,?,?,?,?,?)",
> (statement, result) -> {
> statement.setString(1,result.getAction());
> statement.setString(2,result.getServerIp());
> statement.setString(3,result.getHandleSerialno());
> statement.setString(4,result.getMd5Num());
> statement.setString(5,result.getInsertTime());
> statement.setString(6, result.getDateTime());
> },
> JdbcExecutionOptions.builder()
> .withBatchSize(1000)
> .withBatchIntervalMs(200)
> .withMaxRetries(5)
> .build(),
> new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>
> .withUrl("jdbc:mysql://111/data_ret_log?useSSL=false=false=false=utf-8=convertToNull=true=Asia/Shanghai=true")
> .withDriverName("com.mysql.jdbc.Driver")
> .withUsername("111")
> .withPassword("222")
> .build()
> )).name("sink-error-mysql");
> |
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: 关于DataStream API计算批数据的聚合值

2023-07-25 Thread Shammon FY
Hi,

跟使用普通流式作业的DataStream用法一样,只需要在RuntimeMode里使用Batch模式,Flink在Batch模式下会只输出最后的结果,而不会输出中间结果。具体可以参考Flink里的WordCount例子
[1]

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java


On Wed, Jul 26, 2023 at 9:10 AM Liu Join  wrote:

> 例如:我使用DataStream api计算批数据也就是有界流的平均值,如何实现只输出最后一条平均值的数据,不输出中间值
>


Re: kafka sink

2023-07-23 Thread Shammon FY
Hi nick,

Is there any error log? That may help to analyze the root cause.

On Sun, Jul 23, 2023 at 9:53 PM nick toker  wrote:

> hello
>
>
> we replaced deprecated kafka producer with kafka sink
> and from time to time when we submit a job he stack for 5 min in
> inisazaing ( on sink operators)
> we verify the the transaction prefix is unique
>
> it's not happened when we use kafka producer
>
> What can be the reason?
>
>


Re: flink1.17.1版本 MiniCluster is not yet running or has already been shut down.

2023-07-23 Thread Shammon FY
Hi,

运行的是哪个例子?从错误上看是在从MiniCluster获取结果的时候,MiniCluster被关闭了

Best,
Shammon FY

On Sat, Jul 22, 2023 at 3:25 PM guanyq  wrote:

> 本地IDEA运行 MiniCluster is not yet running or has already been shut down.
> 请问是什么原因,如何处理
>
>
>
>
> 15:19:27,511 INFO
> org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] -
> Stopping resource manager service.
>
> 15:19:27,503 WARN
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] -
> Failed to get job status so we assume that the job has terminated. Some
> data might be lost.
>
> java.lang.IllegalStateException: MiniCluster is not yet running or has
> already been shut down.
>
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> ~[flink-core-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1060)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:933)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:857)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
> [flink-table-planner_25e35ab8-6377-4c6a-a928-a9fe1ff9e7f4.jar:1.17.1]
>
> at
> org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
> [flink-table-common-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
> [flink-table-api-java-1.17.1.jar:1.17.1]


Re: About cluster.evenly-spread-out-slots

2023-07-20 Thread Shammon FY
Hi Kamal,

In general, tasks are scheduled in topological order, which means the
source tasks will be scheduled first and they will be deployed on different
TM if there are enough TMs. However, as Flink does not guarantee this, in
some special situations such as failover, multiple source tasks may be
scheduled to a single TM.

Best,
Shamon FY

On Fri, Jul 21, 2023 at 10:43 AM Kamal Mittal 
wrote:

> Hello Shammon,
>
>
>
> Thanks for quick reply.
>
>
>
> Is there any way to limit only one task executes on each task manager for
> an operator like source?
>
>
>
> If parallelism is set  equal to no. of task managers for an operator then
> it can work and keep cluster.evenly-spread-out-slots=true?
>
>
>
> Actually there is TCP server socket custom source created with parallelism
> 1 as TCP server socket can only bind once for a port and this limits this
> source task to execute only at one task manager which we want to avoid and
> want to scale it across task managers.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Shammon FY 
> *Sent:* 21 July 2023 07:41 AM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About cluster.evenly-spread-out-slots
>
>
>
> Hi Kamal,
>
>
>
> Even if `cluster.evenly-spread-out-slots` is set to true, Flink will not
> guarantee that same operator multiple tasks are never executed/scheduled on
> the same task manager, it just means Flink will use
> `LeastUtilizationSlotMatchingStrategy` to find the matching slot for the
> task.
>
>
>
> As we know there will be multiple task managers with multiple slots in
> Flink cluster, there will be two strategy to find the matching slot for
> task according to its resource requirement:
>
>
>
> 1> Use AnyMatchingSlotMatchingStrategy as default, it will traverse all
> free slots until it finds one slot which meets the resource requirement for
> the task.
>
> 2>  Use LeastUtilizationSlotMatchingStrategy when
> `cluster.evenly-spread-out-slots` is true, it will calculate utilization
> according the registered slot number, free slot number for each task
> manager and find the matching slot from the task manager with min
> utilization.
>
>
>
> Back to your example, if all slots in TM1 and TM2 match the task
> requirements, there will be one free slot in TM1 and TM2 after your job is
> scheduled if `cluster.evenly-spread-out-slots` is set to true.
>
>
>
> Best,
>
> Shammon FY
>
>
>
>
>
> On Thu, Jul 20, 2023 at 3:47 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> If property “cluster.evenly-spread-out-slots” is set to TRUE then Flink
> guarantees that same operator multiple tasks are never executed/scheduled
> on same task manager? Definitely this will depend upon parallelism value
> used for an operator and no. of task slots available.
>
>
>
> Like in below diagram both operators has parallelism value as 2 and no. of
> task slots available on each task manager are 3 each. So allocation of task
> slots will be as below and one task slot on each task manager will remain
> free?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Rgds,
>
> Kamal
>
>


Re: About cluster.evenly-spread-out-slots

2023-07-20 Thread Shammon FY
Hi Kamal,

Even if `cluster.evenly-spread-out-slots` is set to true, Flink will not
guarantee that same operator multiple tasks are never executed/scheduled on
the same task manager, it just means Flink will use
`LeastUtilizationSlotMatchingStrategy` to find the matching slot for the
task.

As we know there will be multiple task managers with multiple slots in
Flink cluster, there will be two strategy to find the matching slot for
task according to its resource requirement:

1> Use AnyMatchingSlotMatchingStrategy as default, it will traverse all
free slots until it finds one slot which meets the resource requirement for
the task.
2>  Use LeastUtilizationSlotMatchingStrategy when
`cluster.evenly-spread-out-slots` is true, it will calculate utilization
according the registered slot number, free slot number for each task
manager and find the matching slot from the task manager with min
utilization.

Back to your example, if all slots in TM1 and TM2 match the task
requirements, there will be one free slot in TM1 and TM2 after your job is
scheduled if `cluster.evenly-spread-out-slots` is set to true.

Best,
Shammon FY


On Thu, Jul 20, 2023 at 3:47 PM Kamal Mittal via user 
wrote:

> Hello,
>
>
>
> If property “cluster.evenly-spread-out-slots” is set to TRUE then Flink
> guarantees that same operator multiple tasks are never executed/scheduled
> on same task manager? Definitely this will depend upon parallelism value
> used for an operator and no. of task slots available.
>
>
>
> Like in below diagram both operators has parallelism value as 2 and no. of
> task slots available on each task manager are 3 each. So allocation of task
> slots will be as below and one task slot on each task manager will remain
> free?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> Rgds,
>
> Kamal
>


Re: Suggestions for Open Source FLINK SQL editor

2023-07-19 Thread Shammon FY
Hi Rajat,

Currently sql-gateway supports REST[1] and Hive[2] endpoints.

For Hive endpoints, you can submit sql jobs with existing Hive clients,
such as hive jdbc, apache superset and other systems.

For REST endpoints, you can use flink sql-client to submit your sql jobs.
We support jdbc-driver[3] in the next release version(1.18) and you can
submit sql jobs in all editors which support standard jdbc protocol such as
apache superset, tableau and .etc


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/rest/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/hiveserver2/
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/jdbcdriver/

Best,
Shammon FY

On Thu, Jul 20, 2023 at 12:48 AM Rajat Ahuja 
wrote:

> Hi team,
>
> I have set up a session cluster on k8s via sql gateway.  I am looking for
> an open source Flink sql editor that can submit sql queries on top of the
> k8s session cluster. Any suggestions for sql editor to submit queries ?
>
>
> Thanks
>


Re: Checkpoint size smaller than Savepoint size

2023-07-19 Thread Shammon FY
Hi Neha,

The HOP window will increase the size of the checkpoint and I'm sorry
that I'm not very familiar with the HOP window.

If the configurations are all right, and you want to confirm if it's a HOP
window issue, I think you can submit a flink job without HOP window but
with regular agg operators, and observe whether the checkpoint and
savepoint meet expectations.

Best,
Shammon FY

On Tue, Jul 18, 2023 at 8:25 PM Neha .  wrote:

> Hi Shammon,
>
> These configs exist in Flink WebUI. We have set
> exeEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); Do
> you think it can create some issues for the HOP(proctime, some interval,
> some interval) and not releasing the state for checkpoints?
> I am really confused about why savepoints are working fine and not
> checkpoints.
>
> On Tue, Jul 18, 2023 at 6:56 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I think you can first check whether the options `state.backend` and
>> `state.backend.incremental` you mentioned above exist in
>> `JobManager`->`Configuration` in Flink webui. If they do not exist, you may
>> be using the wrong conf file.
>>
>> Best,
>> Shammon FY
>>
>>
>> On Mon, Jul 17, 2023 at 5:04 PM Neha .  wrote:
>>
>>> Hi Shammon,
>>>
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>>
>>> This is already set in the Flink-conf. Anything else that should be
>>> taken care of for the incremental checkpointing? Is there any related bug
>>> in Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink
>>> 1.13.6.
>>> What can be the reason for stopped incremental checkpointing?
>>>
>>> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:
>>>
>>>> Hi Neha,
>>>>
>>>> I noticed that the `Checkpointed Data Size` is always equals to `Full
>>>> Checkpoint Data Size`, I think the job is using full checkpoint instead of
>>>> incremental checkpoint,  you can check it
>>>>
>>>> Best,
>>>> Shammon FY
>>>>
>>>> On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:
>>>>
>>>>> Hello Shammon,
>>>>>
>>>>> Thank you for your assistance.
>>>>> I have already enabled the incremental checkpointing, Attaching the
>>>>> screenshot. Can you please elaborate on what makes you think it is not
>>>>> enabled, It might hint towards the issue. The problem is checkpoint size 
>>>>> is
>>>>> not going down and keeps on increasing while savepoint size shows the
>>>>> correct behavior of going up and down with the throughput peaks.
>>>>>
>>>>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>>>>>
>>>>>
>>>>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>>>>>
>>>>>> Hi Neha,
>>>>>>
>>>>>> I think it is normal for the data size of a savepoint to be smaller
>>>>>> than the full data of a checkpoint. Flink uses rocksdb to store
>>>>>> checkpointed data, which is an LSM structured storage where the same key
>>>>>> will have multiple version records, while savepoint will traverse all 
>>>>>> keys
>>>>>> and store only one record per key.
>>>>>>
>>>>>> But I noticed that you did not enable incremental checkpoint, which
>>>>>> resulted in each checkpoint saving full data. You can refer to [1] for 
>>>>>> more
>>>>>> detail and turn it on, which will reduce the data size of the checkpoint.
>>>>>>
>>>>>> [1]
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>>>>>> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/*incremental-checkpoints__;Iw!!BeGeivfSdT4o5A!i6xqu0TfnOScUXZ2hWnwv1pOEjBPosucnmXfxDO3762tx0hIlwBc3e0V0ZpxUm4Q4VAPQdSXSY25U1wp$>
>>>>>>
>>>>>> Best,
>>>>>> Shammon FY
>>>>>>
>>>>>>
>>>>>> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>>>>>>
>>>>>>> Hello  Shammon FY,
>>>>>>>
>>>>>>> It is a production issue for me. Can you please take a look if
>>>>>>> anything can be done?
>>>>>>>
>>>>

Re: How to resume a job from checkpoint with the SQL gateway.

2023-07-18 Thread Shammon FY
Hi Xiaolong,

For new versions such as flink-1.17, flink sql-gateway supports job
management and user can stop/start jobs with savepoint. You can start a job
with a given savepoint path as [1] and stop a job with or without savepoint
as [2].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job

Best,
Shammon FY


On Tue, Jul 18, 2023 at 9:56 AM Xiaolong Wang 
wrote:

> Hi, Shammon,
>
> I know that the job manager can auto-recover via HA configurations, but
> what if I want to upgrade the running Flink SQL submitted by the Flink SQL
> gateway ?
>
> In normal cases, I can use the
>
>> ./flink run application -s ${SAVEPOINT_PATH} local://${FLINK_JOB_JAR}
>
> to resume a Flink job from a savepoint/checkpoint. The question is, how to
> do so with Flink sql gateway ?  What should I fill in the ${FLINK_JOB_JAR}
> field ?
>
> Thanks in advanced.
>
> On Mon, Jul 17, 2023 at 9:14 AM Shammon FY  wrote:
>
>> Hi Xiaolong,
>>
>> When a streaming job is submitted via Sql-Gateway, its lifecycle is no
>> longer related to Sql Gateway.
>>
>> Returning to the issue of job recovery, I think if your job cluster is
>> configured with HA, jobmanager will recover running streaming jobs from
>> their checkpoints after a failover occurs.
>>
>> Best,
>> Shammon FY
>>
>>
>> On Thu, Jul 13, 2023 at 10:22 AM Xiaolong Wang <
>> xiaolong.w...@smartnews.com> wrote:
>>
>>> Hi,
>>>
>>> I'm currently working on providing a SQL gateway to submit both
>>> streaming and batch queries.
>>>
>>> My question is, if a streaming SQL is submitted and then the jobmanager
>>> crashes, is it possible to resume the streaming SQL from the latest
>>> checkpoint with the SQL gateway ?
>>>
>>>
>>>
>>


Re: Async IO For Cassandra

2023-07-17 Thread Shammon FY
Hi Pritam,

I'm sorry that I'm not familiar with Cassandra. If your async function is
always the root cause for backpressure, I think you can check the latency
for the async request in your function and log some metrics.

By the way, I think you can add cache in your async function to speedup the
lookup request which we always do in loopup join for sql jobs.


Best,
Shammon FY

On Mon, Jul 17, 2023 at 10:09 PM Pritam Agarwala <
pritamagarwala...@gmail.com> wrote:

> Hi Team,
>
>
> Any input on this will be really helpful.
>
>
> Thanks!
>
> On Tue, Jul 11, 2023 at 12:04 PM Pritam Agarwala <
> pritamagarwala...@gmail.com> wrote:
>
>> Hi Team,
>>
>>
>> I am using  "AsyncDataStream.unorderedWait" to connect to cassandra . The
>> cassandra lookup operators are becoming the busy operator and creating
>> back-pressure result low throughput.
>>
>>
>> The Cassandra lookup is a very simple query. So I increased the capacity
>> parameter to 80 from 15 and could see low busy % of cassandra operators.  I
>> am monitoring the cassandra open connections and connected host metrics.
>> Couldn't see any change on these metrics.
>>
>>
>> How is the capacity parameter related to cassandra open connections and
>> host ? If I increase capacity more will it have any impact on these metrics
>> ?
>>
>> Thanks & Regards,
>> Pritam
>>
>


Re: Checkpoint size smaller than Savepoint size

2023-07-17 Thread Shammon FY
Hi Neha,

I think you can first check whether the options `state.backend` and
`state.backend.incremental` you mentioned above exist in
`JobManager`->`Configuration` in Flink webui. If they do not exist, you may
be using the wrong conf file.

Best,
Shammon FY


On Mon, Jul 17, 2023 at 5:04 PM Neha .  wrote:

> Hi Shammon,
>
> state.backend: rocksdb
> state.backend.incremental: true
>
> This is already set in the Flink-conf. Anything else that should be taken
> care of for the incremental checkpointing? Is there any related bug in
> Flink 1.16.1? we have recently migrated to Flink 1.16.1 from Flink 1.13.6.
> What can be the reason for stopped incremental checkpointing?
>
> On Mon, Jul 17, 2023 at 11:35 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I noticed that the `Checkpointed Data Size` is always equals to `Full
>> Checkpoint Data Size`, I think the job is using full checkpoint instead of
>> incremental checkpoint,  you can check it
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:
>>
>>> Hello Shammon,
>>>
>>> Thank you for your assistance.
>>> I have already enabled the incremental checkpointing, Attaching the
>>> screenshot. Can you please elaborate on what makes you think it is not
>>> enabled, It might hint towards the issue. The problem is checkpoint size is
>>> not going down and keeps on increasing while savepoint size shows the
>>> correct behavior of going up and down with the throughput peaks.
>>>
>>> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>>>
>>>
>>> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>>>
>>>> Hi Neha,
>>>>
>>>> I think it is normal for the data size of a savepoint to be smaller
>>>> than the full data of a checkpoint. Flink uses rocksdb to store
>>>> checkpointed data, which is an LSM structured storage where the same key
>>>> will have multiple version records, while savepoint will traverse all keys
>>>> and store only one record per key.
>>>>
>>>> But I noticed that you did not enable incremental checkpoint, which
>>>> resulted in each checkpoint saving full data. You can refer to [1] for more
>>>> detail and turn it on, which will reduce the data size of the checkpoint.
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>>>> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/*incremental-checkpoints__;Iw!!BeGeivfSdT4o5A!i6xqu0TfnOScUXZ2hWnwv1pOEjBPosucnmXfxDO3762tx0hIlwBc3e0V0ZpxUm4Q4VAPQdSXSY25U1wp$>
>>>>
>>>> Best,
>>>> Shammon FY
>>>>
>>>>
>>>> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>>>>
>>>>> Hello  Shammon FY,
>>>>>
>>>>> It is a production issue for me. Can you please take a look if
>>>>> anything can be done?
>>>>>
>>>>> -- Forwarded message -
>>>>> From: Neha . 
>>>>> Date: Fri, Jul 14, 2023 at 4:06 PM
>>>>> Subject: Checkpoint size smaller than Savepoint size
>>>>> To: 
>>>>>
>>>>>
>>>>> Hello,
>>>>>
>>>>> According to Flink's documentation, Checkpoints are designed to be
>>>>> lightweight. However, in my Flink pipeline, I have observed that the
>>>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>>>> behavior? What are the possible scenarios that can lead to this situation?
>>>>>
>>>>> Additionally, I have noticed that the checkpoint size in my datastream
>>>>> pipeline continues to grow while the savepoint size behaves as expected.
>>>>> Could this be attributed to the usage of Common Table Expressions (CTEs) 
>>>>> in
>>>>> Flink SQL?
>>>>>
>>>>> Flink version: 1.16.1
>>>>> Incremental checkpointing is enabled.
>>>>> StateBackend: RocksDB
>>>>> Time Characteristic: Ingestion
>>>>>
>>>>> SQL:
>>>>>
>>>>> SELECT
>>>>>   *
>>>>> from
>>>>>   (
>>>>> With Actuals as (
>>>>>   SELECT
>>>>> clientOrderId,
>>>>> Cast(
>>>

Re: Checkpoint size smaller than Savepoint size

2023-07-17 Thread Shammon FY
Hi Neha,

I noticed that the `Checkpointed Data Size` is always equals to `Full
Checkpoint Data Size`, I think the job is using full checkpoint instead of
incremental checkpoint,  you can check it

Best,
Shammon FY

On Mon, Jul 17, 2023 at 10:25 AM Neha .  wrote:

> Hello Shammon,
>
> Thank you for your assistance.
> I have already enabled the incremental checkpointing, Attaching the
> screenshot. Can you please elaborate on what makes you think it is not
> enabled, It might hint towards the issue. The problem is checkpoint size is
> not going down and keeps on increasing while savepoint size shows the
> correct behavior of going up and down with the throughput peaks.
>
> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>
>
> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY  wrote:
>
>> Hi Neha,
>>
>> I think it is normal for the data size of a savepoint to be smaller than
>> the full data of a checkpoint. Flink uses rocksdb to store checkpointed
>> data, which is an LSM structured storage where the same key will have
>> multiple version records, while savepoint will traverse all keys and store
>> only one record per key.
>>
>> But I noticed that you did not enable incremental checkpoint, which
>> resulted in each checkpoint saving full data. You can refer to [1] for more
>> detail and turn it on, which will reduce the data size of the checkpoint.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/*incremental-checkpoints__;Iw!!BeGeivfSdT4o5A!i6xqu0TfnOScUXZ2hWnwv1pOEjBPosucnmXfxDO3762tx0hIlwBc3e0V0ZpxUm4Q4VAPQdSXSY25U1wp$>
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:
>>
>>> Hello  Shammon FY,
>>>
>>> It is a production issue for me. Can you please take a look if anything
>>> can be done?
>>>
>>> -- Forwarded message -
>>> From: Neha . 
>>> Date: Fri, Jul 14, 2023 at 4:06 PM
>>> Subject: Checkpoint size smaller than Savepoint size
>>> To: 
>>>
>>>
>>> Hello,
>>>
>>> According to Flink's documentation, Checkpoints are designed to be
>>> lightweight. However, in my Flink pipeline, I have observed that the
>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>> behavior? What are the possible scenarios that can lead to this situation?
>>>
>>> Additionally, I have noticed that the checkpoint size in my datastream
>>> pipeline continues to grow while the savepoint size behaves as expected.
>>> Could this be attributed to the usage of Common Table Expressions (CTEs) in
>>> Flink SQL?
>>>
>>> Flink version: 1.16.1
>>> Incremental checkpointing is enabled.
>>> StateBackend: RocksDB
>>> Time Characteristic: Ingestion
>>>
>>> SQL:
>>>
>>> SELECT
>>>   *
>>> from
>>>   (
>>> With Actuals as (
>>>   SELECT
>>> clientOrderId,
>>> Cast(
>>>   ValueFromKeyCacheUDF(
>>> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>>   ) as INT
>>> ) as zoneId,
>>> cityId,
>>> case
>>>   when status = 'ASSIGNED' then 1
>>>   else 0
>>> end as acceptance_flag,
>>> unicast.proctime
>>>   FROM
>>> order
>>> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
>>> AND order.proctime BETWEEN unicast.proctime - interval '70'
>>> minute
>>> AND unicast.proctime + interval '10' minute
>>> and unicast.status in ('ASSIGNED', 'REJECTED')
>>> ),
>>> zone_agg as (
>>>   select
>>> zoneId,
>>> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
>>> avg(cityId) as cityId,
>>> COUNT(*) as `unicast_count`,
>>> proctime() as proctime
>>>   from
>>> Actuals
>>>   group by
>>> HOP(
>>>   proctime(),
>>>   interval '5' minute,
>>>   interval '30' minute
>>> ),
>>> zoneId
>>> ),
>>> city_agg as(
>>>   select
>>> cityId,
>>> sum

Re: How to resume a job from checkpoint with the SQL gateway.

2023-07-16 Thread Shammon FY
Hi Xiaolong,

When a streaming job is submitted via Sql-Gateway, its lifecycle is no
longer related to Sql Gateway.

Returning to the issue of job recovery, I think if your job cluster is
configured with HA, jobmanager will recover running streaming jobs from
their checkpoints after a failover occurs.

Best,
Shammon FY


On Thu, Jul 13, 2023 at 10:22 AM Xiaolong Wang 
wrote:

> Hi,
>
> I'm currently working on providing a SQL gateway to submit both streaming
> and batch queries.
>
> My question is, if a streaming SQL is submitted and then the jobmanager
> crashes, is it possible to resume the streaming SQL from the latest
> checkpoint with the SQL gateway ?
>
>
>


Re: Set processing time in the past

2023-07-16 Thread Shammon FY
Hi Eugenio,

I cannot catch it clearly, could you describe it in more detail?

Best,
Shammon FY

On Sat, Jul 15, 2023 at 5:14 PM Eugenio Marotti <
ing.eugenio.maro...@gmail.com> wrote:

> Hi everyone,
>
> is there a way to set Flink processing time in the past?
>
> Thanks
> Eugenio
>


Re: Re: flink1.14.5 sql-client 运行在yarn-session模式提交任务报错

2023-07-16 Thread Shammon FY
Hi,

根据上面的异常栈信息,你可以检查一下是否配置了cluster id,在yarn里配置项是`yarn.application.id`

Best,
Shammon FY


On Sat, Jul 15, 2023 at 6:50 PM 杨东树  wrote:

> 您好,
>针对sql-client运行在yarn-session模式报错,现补充相关日志报错信息:
> 2023-07-15 18:43:21,503 INFO  org.apache.flink.table.client.cli.CliClient
> [] - Command history file path: /root/.flink-sql-history
> 2023-07-15 18:43:28,225 INFO
> org.apache.flink.table.catalog.CatalogManager[] - Set the
> current default database as [flink] in the current default catalog [myhive].
> 2023-07-15 18:43:38,410 WARN
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder   [] - Offset
> commit on checkpoint is disabled because group.id is not specified
> 2023-07-15 18:43:39,986 WARN
> org.apache.flink.connector.kafka.source.KafkaSourceBuilder   [] - Offset
> commit on checkpoint is disabled because group.id is not specified
> 2023-07-15 18:43:40,605 WARN
> org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The
> configuration directory ('/usr/local/flink-1.14.5/conf') already contains a
> LOG4J config file.If you want to use logback, then please delete or rename
> the log configuration file.
> 2023-07-15 18:43:40,676 INFO  org.apache.hadoop.yarn.client.RMProxy
> [] - Connecting to ResourceManager at /0.0.0.0:8032
> 2023-07-15 18:43:40,788 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - No path for the flink jar passed. Using the location
> of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2023-07-15 18:43:40,791 WARN  org.apache.flink.table.client.cli.CliClient
> [] - Could not execute SQL statement.
> org.apache.flink.table.client.gateway.SqlExecutionException: Could not
> execute SQL statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:224)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callInserts(CliClient.java:571)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:560)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:420)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$1(CliClient.java:332)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_231]
> at
> org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:325)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
> [flink-sql-client_2.11-1.14.5.jar:1.14.5]
> Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:791)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:754)
> ~[flink-table_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeModifyOperations$4(LocalExecutor.java:222)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:88)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeModifyOperations(LocalExecutor.java:222)
> ~[flink-sql-client_2.11-1.14.5.jar:1.14.5]
> ... 12 more
> Caused by: java.lang.IllegalStateException
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
> ~[flink-dist_2.11-1.14.5.jar:1.14.5]
> at
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.ex

Re: Checkpoint size smaller than Savepoint size

2023-07-16 Thread Shammon FY
Hi Neha,

I think it is normal for the data size of a savepoint to be smaller than
the full data of a checkpoint. Flink uses rocksdb to store checkpointed
data, which is an LSM structured storage where the same key will have
multiple version records, while savepoint will traverse all keys and store
only one record per key.

But I noticed that you did not enable incremental checkpoint, which
resulted in each checkpoint saving full data. You can refer to [1] for more
detail and turn it on, which will reduce the data size of the checkpoint.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints

Best,
Shammon FY


On Sun, Jul 16, 2023 at 2:30 PM Neha .  wrote:

> Hello  Shammon FY,
>
> It is a production issue for me. Can you please take a look if anything
> can be done?
>
> -- Forwarded message -
> From: Neha . 
> Date: Fri, Jul 14, 2023 at 4:06 PM
> Subject: Checkpoint size smaller than Savepoint size
> To: 
>
>
> Hello,
>
> According to Flink's documentation, Checkpoints are designed to be
> lightweight. However, in my Flink pipeline, I have observed that the
> savepoint sizes are smaller than the checkpoints. Is this expected
> behavior? What are the possible scenarios that can lead to this situation?
>
> Additionally, I have noticed that the checkpoint size in my datastream
> pipeline continues to grow while the savepoint size behaves as expected.
> Could this be attributed to the usage of Common Table Expressions (CTEs) in
> Flink SQL?
>
> Flink version: 1.16.1
> Incremental checkpointing is enabled.
> StateBackend: RocksDB
> Time Characteristic: Ingestion
>
> SQL:
>
> SELECT
>   *
> from
>   (
> With Actuals as (
>   SELECT
> clientOrderId,
> Cast(
>   ValueFromKeyCacheUDF(
> concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>   ) as INT
> ) as zoneId,
> cityId,
> case
>   when status = 'ASSIGNED' then 1
>   else 0
> end as acceptance_flag,
> unicast.proctime
>   FROM
> order
> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
> AND order.proctime BETWEEN unicast.proctime - interval '70' minute
> AND unicast.proctime + interval '10' minute
> and unicast.status in ('ASSIGNED', 'REJECTED')
> ),
> zone_agg as (
>   select
> zoneId,
> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
> avg(cityId) as cityId,
> COUNT(*) as `unicast_count`,
> proctime() as proctime
>   from
> Actuals
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '30' minute
> ),
> zoneId
> ),
> city_agg as(
>   select
> cityId,
> sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
> proctime() as proctime
>   from
> Actuals
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '30' minute
> ),
> cityId
> ),
> final as (
>   select
> zone_agg.zoneId,
> zone_agg.cityId,
> avg(zone_agg.unicast_count) as unicast_count,
> avg(zone_agg.zone_quotient) as zone_quotient,
> avg(city_agg.city_quotient) as city_quotient
>   from
> city_agg
> INNER join zone_agg on zone_agg.cityId = city_agg.cityId
> AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60'
> minute
> AND zone_agg.proctime
>   group by
> HOP(
>   proctime(),
>   interval '5' minute,
>   interval '30' minute
> ),
> zone_agg.zoneId,
> zone_agg.cityId
> ),
> new_final as (
>   select
> 'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key,
> zone_quotient,
> city_quotient,
> case
>   when unicast_count > 5 then zone_quotient
>   else city_quotient
> end as `value`
>   from
> final
> )
> select
>   key,
>   case
> when new_final.`value` > 1 then 1
> else new_final.`value`
>   end as `value`,
>   zone_quotient,
>   city_quotient
> from
>   new_final
>   )
>
>
>
>
> --
> IMPORTANT NOTICE:  The contents of this email and any attachments are
> confidential in nature and intended solely for the addressee, and are
> subject to the terms and conditions of disclosure as further described
> here: https://www.scd.swiggy.in/nda. If you are not the intended
> recipient or you do not agree to the terms and conditions of disclosure,
> please delete this email immediately, and notify the sender by return
> email. In the event that you continue to access the information herein or
> act upon it in any manner, the terms and conditions shall be deemed
> accepted by you.


Re: Re: PartitionNotFoundException循环重启

2023-07-14 Thread Shammon FY
Hi,

我觉得增加到3分钟可能不是一个合适的方法,这会增加作业恢复时间。建议还是追查一下为什么上游task这么长时间没有部署启动成功比较好。

Best,
Shammon FY


On Fri, Jul 14, 2023 at 2:25 PM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> hi, 上次将`taskmanager.network.request-backoff.max` 从默认的10s增加到30s后 跑了5天还是出现
> PartitionNotFoundException循环重启
> 从日志看是连续三次checkpoint超时失败后自动重启job (Checkpointed Data
> Size一直在增长,即便当前无数据处理,也有几十上百M),某个算子会一直失败重启任务
>
> 以下是整个过程的失败日志,是否将 `taskmanager.network.request-backoff.max` 再增加到3分钟可以避免
> PartitionNotFoundException ?
>
> 2023-07-12 11:07:49,490 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Checkpoint 19177 of job 3b800d54fb6a002be7feadb1a8b6894e expired before
> completing.
> 2023-07-12 11:07:49,490 WARN
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to
> trigger or complete checkpoint 19177 for job
> 3b800d54fb6a002be7feadb1a8b6894e. (3 consecutive failed attempts so far)
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> expired before completing.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2216)
> [flink-dist-1.17.1.jar:1.17.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_77]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [?:1.8.0_77]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]
> 2023-07-12 11:07:49,490 INFO
> org.apache.flink.runtime.checkpoint.CheckpointRequestDecider [] -
> checkpoint request time in queue: 2280006
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to
> recover from a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold. The latest checkpoint failed due to Checkpoint expired
> before completing., view the Checkpoint History tab or the Job Manager log
> to find out why continuous checkpoints failed.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:212)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2155)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2134)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$700(CheckpointCoordinator.java:101)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2216)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_77]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ~[?:1.8.0_77]
> at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_77]
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.jobmaster.JobMaster [] - 51 tasks
> will be restarted to recover from a global failure.
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job Phase
> 2 Data Warehouse Processing (3b800d54fb6a002be7feadb1a8b6894e) switched
> from state RUNNING to RESTARTING.
>
> 2023-07-12 11:07:50,007 INFO
> org.apache.flink.runtime.executiongraph.Executi

Re: 从kafka中读取数据到hdfs,过段时间报错

2023-07-11 Thread Shammon FY
Hi

你可以贴一下完整的异常栈信息,这可以帮助定位具体问题

Best,
Shammon FY


On Wed, Jul 12, 2023 at 10:52 AM chenyu_opensource <
chenyu_opensou...@163.com> wrote:

> 目前是用flink1.12版本,从kafka中读取数据到hdfs,前期运行正常,过段时间报错:
> Caused by: org.apache.flink.streaming.runtime.tasks.TimerException:
> org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> BP-504689274-10.204.4.58-1507792652938:blk_3265799450_2192171234 does not
> exist or is not under Constructionnull
> flink taskmanager报错,会和hdfs连接中断。
> datanode日志报错DataXceiver error processing WRITE_BLOCK operation
>
>
>
> 背景:读取kafka数据,sink是多个的,为了处理不同的逻辑,保存到不同的hdfs目录,同时数据量上存在数据倾斜,已使用不同的并行度去处理,但还是出现这种问题。查询到的dfs.datanode.max.transfer.threads=16384。同时当前有下游业务读取hdfs目录,是否有所影响。
>
>
> 请指教,谢谢


Re: Store a state at a RDBMS before TTL passes by

2023-07-11 Thread Shammon FY
Hi Anastasios,

I think you may need to implement a customized trigger to emit record when
a session window is created. You can refer to [1] for more detailed
information.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#triggers

Best,
Shammon FY

On Tue, Jul 11, 2023 at 4:22 PM Anastasios Makris <
anastasios.makris...@gmail.com> wrote:

> Hi Shammon,
>
> I implemented a session window with a static time gap, followed by a
> reduce function that keeps only the latest record.
>
> Your proposed solution was what I was looking for.
> Thank you!
>
> But I have another question. What if I want to write the 1st record of a
> session window when it starts in order to know that it opened a new session
> window that currently processes the data?
> Is it possible?
>
>
> Best regards,
>
> Anastasis
>
> On Fri, Jun 16, 2023 at 10:04 AM Anastasios Makris <
> anastasios.makris...@gmail.com> wrote:
>
>> Thank you very much Shammon for your reply.
>> I am going to study this solution and write back to you as soon as
>> possible.
>>
>> Best Regards,
>> Anastasis
>>
>> On Fri, Jun 16, 2023 at 4:14 AM Shammon FY  wrote:
>>
>>> Hi Anastasios,
>>>
>>> What you want sounds like a session window [1], maybe you can refer to
>>> the doc for more details.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#session-windows
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Thu, Jun 15, 2023 at 10:03 PM Anastasios Makris <
>>> anastasios.makris...@gmail.com> wrote:
>>>
>>>> Hi Flink users,
>>>>
>>>> I created a KeyedStream that tracks for each user of my website some
>>>> metrics. It's time a user produces an event the metrics are recomputed and
>>>> change.
>>>> I would like to keep the outcome of a user's session at an RDBMS, which
>>>> will be a single row.
>>>>
>>>> The first and obvious solution would be to Insert the row at the RDBMS
>>>> and then update it, it's time something new occurs.
>>>>
>>>> I would like to ask if another solution is possible.
>>>> For example, could we maybe schedule to use the RDBMS as a sink of the
>>>> state before its TTL passes by?
>>>>
>>>> Best regards,
>>>> Anastasis
>>>>
>>>


Re: Local State Storage

2023-07-11 Thread Shammon FY
Hi amenreet,

I think there are two ways to clean up state data in the flink job
automatically:

1. State TTL. You can configure the ttl [1] for state according to your
requirements, and flink job will clean the data when they are out of date.
For flink SQL jobs you can set a global ttl for all operators and set each
operator ttl like [2]

2. If you use rocksdb statebacked in your job, flink will delete the sst
files if they are not referenced by checkpoint any more.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-280%3A+Introduce+EXPLAIN+PLAN_ADVICE+to+provide+SQL+advice

Best,
Shammon FY


On Tue, Jul 11, 2023 at 3:20 PM amenreet sodhi  wrote:

> Hi Team,
>
> I wanted to confirm, the local state which TM stores in the directory
> either we provide through config or default i.e. /tmp folder, does it clear
> itself from time to time or the size just keeps on increasing?
>
> Thanks
> Regards
> Amenreet Singh Sodhi
>
>


Re: flink1.14.5 sql-client 运行在yarn-session模式提交任务报错

2023-07-09 Thread Shammon FY
Hi,

邮件里的图片看不到

Best,
Shammon FY

On Sun, Jul 9, 2023 at 7:30 PM 杨东树  wrote:

> 各位好,
>目前我在使用flink1.14.5版本的sql-client on
> yarn-session模式时,发现无法正常执行sql任务,日志报如下错误,希望能得到指导,谢谢:
>背景信息:
>1、当flink配置execution.target:
> yarn-per-job时,随后进入sql-client执行sql任务,可正常执行。
>2、当flink配置execution.target: yarn-session,并启动flink
> yarn-session集群,随后进入sql-client执行同样的sql任务,报上图中的错误。
>


Re: Flink in HA mode causing JM Failure

2023-07-07 Thread Shammon FY
Hi amenreet,

According to the error message, I think you can log in the jm pod after it
starts, and check access permissions for the directory
`file:///opt/flink/pm/ha`

Best,
Shammon FY


On Fri, Jul 7, 2023 at 6:04 PM amenreet sodhi  wrote:

> Hi Shammon
>
> I am using an external NFS mount which gets mounted at path
> /opt/flink/pm/, and the path that is mentioned there refers to that
> only, so not a local file. Could there be any other configuration issue?
>
> Thanks
> Regard
> Amenreet Singh Sodhi
>
> On Fri, Jul 7, 2023 at 2:00 PM Shammon FY  wrote:
>
>> Hi amenreet,
>>
>> Maybe you can try to use hdfs or s3 for `high-availability.storageDir`, I
>> found your current job is using a local file which is started with
>> `file:///`.
>>
>> Best,
>> Shammon FY
>>
>>
>> On Fri, Jul 7, 2023 at 4:20 PM amenreet sodhi 
>> wrote:
>>
>>> Hi All,
>>> I am deploying Flink cluster on Kubernetes in HA mode. But i noticed,
>>> whenever i deploy Flink cluster for first time on K8s cluster, it is not
>>> able to populate the cluster configmap, and due to which JM fails with the
>>> following exception:
>>>
>>> 2023-07-06 16:46:11,428 ERROR 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal 
>>> error occurred in the cluster entrypoint.
>>> java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
>>> The base directory of the JobResultStore isn't accessible. No dirty 
>>> JobResults can be restored.
>>> at 
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>>>  ~[?:?]
>>> at 
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>>>  [?:?]
>>> at 
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>>>  [?:?]
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>  [?:?]
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>  [?:?]
>>> at java.lang.Thread.run(Thread.java:834) [?:?]
>>> Caused by: java.lang.IllegalStateException: The base directory of the 
>>> JobResultStore isn't accessible. No dirty JobResults can be restored.
>>> at 
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
>>> ~[event_executor-1.1.20.jar:?]
>>> at 
>>> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182)
>>>  ~[event_executor-1.1.20.jar:?]
>>> at 
>>> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
>>>  ~[event_executor-1.1.20.jar:?]
>>> at 
>>> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
>>>  ~[event_executor-1.1.20.jar:?]
>>> at 
>>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194)
>>>  ~[event_executor-1.1.20.jar:?]
>>> at 
>>> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
>>>  ~[event_executor-1.1.20.jar:?]
>>> at 
>>> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
>>>  ~[event_executor-1.1.20.jar:?]
>>> at 
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>>>  ~[?:?]
>>>
>>> Once we reinstall/helm upgrade then this exception goes away. How can
>>> this be resolved, any additional configuration required to resolve this?
>>>
>>> I am using the following configuration for HA:
>>>
>>>  high-availability.storageDir: file:///opt/flink/pm/ha
>>> kubernetes.cluster-id: {{ include "fullname" . }}-cluster-{{ now | date 
>>> "20060102150405" }}
>>> high-availability.jobmanager.port: 6123
>>> high-availability.type: kubernetes
>>> high-availability: 
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> kubernetes.namespace: {{ .Release.Namespace }}
>>>
>>> Thanks
>>>
>>> Regards
>>> Amenreet Singh Sodhi
>>>
>>>


Re: Flink in HA mode causing JM Failure

2023-07-07 Thread Shammon FY
Hi amenreet,

Maybe you can try to use hdfs or s3 for `high-availability.storageDir`, I
found your current job is using a local file which is started with
`file:///`.

Best,
Shammon FY


On Fri, Jul 7, 2023 at 4:20 PM amenreet sodhi  wrote:

> Hi All,
> I am deploying Flink cluster on Kubernetes in HA mode. But i noticed,
> whenever i deploy Flink cluster for first time on K8s cluster, it is not
> able to populate the cluster configmap, and due to which JM fails with the
> following exception:
>
> 2023-07-06 16:46:11,428 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal error 
> occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException: java.lang.IllegalStateException: 
> The base directory of the JobResultStore isn't accessible. No dirty 
> JobResults can be restored.
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>  ~[?:?]
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>  [?:?]
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>  [?:?]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
>   at java.lang.Thread.run(Thread.java:834) [?:?]
> Caused by: java.lang.IllegalStateException: The base directory of the 
> JobResultStore isn't accessible. No dirty JobResults can be restored.
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) 
> ~[event_executor-1.1.20.jar:?]
>   at 
> org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182)
>  ~[event_executor-1.1.20.jar:?]
>   at 
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
>  ~[event_executor-1.1.20.jar:?]
>   at 
> org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
>  ~[event_executor-1.1.20.jar:?]
>   at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194)
>  ~[event_executor-1.1.20.jar:?]
>   at 
> org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
>  ~[event_executor-1.1.20.jar:?]
>   at 
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
>  ~[event_executor-1.1.20.jar:?]
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>  ~[?:?]
>
> Once we reinstall/helm upgrade then this exception goes away. How can this
> be resolved, any additional configuration required to resolve this?
>
> I am using the following configuration for HA:
>
>  high-availability.storageDir: file:///opt/flink/pm/ha
> kubernetes.cluster-id: {{ include "fullname" . }}-cluster-{{ now | date 
> "20060102150405" }}
> high-availability.jobmanager.port: 6123
> high-availability.type: kubernetes
> high-availability: 
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> kubernetes.namespace: {{ .Release.Namespace }}
>
> Thanks
>
> Regards
> Amenreet Singh Sodhi
>
>


Re: flink on native k8s里如何使用flink sql gateway

2023-07-05 Thread Shammon FY
Hi,

我们的做法是启动Flink集群后,在其他节点(pod或者独立启动)启动Sql-Gateway,通过Flink的地址远程连接Flink集群,这样Sql-Gateway的部署和Flink集群完全分开

Best,
Shammon FY


On Tue, Jul 4, 2023 at 10:52 AM chaojianok  wrote:

> 大家好,请教个问题。
>
> 用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql
> gateway,大家有什么好的方案吗?
> 目前的做法是,进入pod里启动sql gateway,然后在k8s创建flink-sql-gateway
> service,这样就可以通过这个service来访问sql
> gateway了,但是这个方法有个问题,部署过程中必需进入pod启服务,这是不利于自动化部署的,具体的操作命令如下,大家帮忙看看有没有好的解决方案来避免这个问题。
>
> 1、创建flink集群
> ./bin/kubernetes-session.sh \
> -Dkubernetes.cluster-id=flink-cluster \
> -Dkubernetes.namespace=flink \
> -Dkubernetes.service-account=flink-service-account \
> -Dkubernetes.rest-service.exposed.type=NodePort
>
> 2、进入pod通过 ./bin/sql-gateway.sh start
> -Dsql-gateway.endpoint.rest.address=localhost 启动sql gateway服务,退出pod
>
> 3、创建flink-sql-gateway service
> kubectl expose deployment flink-cluster --type=NodePort --port=8083
> --name=flink-sql-gateway -n flink
>


Re: Re: PartitionNotFoundException循环重启

2023-07-05 Thread Shammon FY
Hi,

如果要增加request
partition的重试时间,可以调整配置项`taskmanager.network.request-backoff.max`,默认是10秒,具体配置可以参阅[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-taskmanageroptions

Best,
Shammon FY

On Tue, Jul 4, 2023 at 11:38 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> 从前面日志看是重启后从hdfs加载checkpoint数据处理(100M左右)这过程好像有点久,还有连kafka消费
> 下游的超时重试  可以设置次数或者时长吗?
>
> 发件人: Shammon FY
> 发送时间: 2023-07-04 10:12
> 收件人: user-zh
> 主题: Re: PartitionNotFoundException循环重启
> Hi,
>
> PartitionNotFoundException异常原因通常是下游task向上游task发送partition
>
> request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。
>
> Best,
> Shammon FY
>
> On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
> zhan...@eastcom-sw.com> wrote:
>
> >
> > 异常日志内容
> >
> > 2023-07-03 20:30:15,164 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink:
> > Sink 3 (2/45)
> > (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> > 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> > nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> > org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > Partition
> >
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> > 3093 not found.
> > at org.apache.flink.runtime.io
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at org.apache.flink.runtime.io
> .network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> > ~[flink-dist-1.17.1.jar:1.17.1]
> > at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> > at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
> >
> >
> >
> > 发件人: zhan...@eastcom-sw.com
> > 发送时间: 2023-07-04 09:25
> > 收件人: user-zh
> > 主题: PartitionNotFoundException循环重启
> > hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> > PartitionNotFoundException 的异常,然后不断的循环重启
> >
> > 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> > 的异常后,不断的重启
> > taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> > taskmanager.network.max-num-tcp-connections: 16
> >
> > 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
> >
> >
>


Re: SQL-gateway Failed to Run

2023-07-04 Thread Shammon FY
Hi Xiaolong,

I think you may need to check the error log in the flink cluster to find
out the root cause.

Best,
Shammon FY

On Tue, Jul 4, 2023 at 3:38 PM Xiaolong Wang 
wrote:

> The flink web ui is fine until I run the Hive query. After that the flink
> deployment is down and the web UI is not accessible.
>
> On Tue, Jul 4, 2023 at 9:13 AM Shammon FY  wrote:
>
>> Hi Xiaolong,
>>
>> From the exception it seems that the flink session cluster is not
>> running properly. Can you visit the flink web ui and everything is ok?
>>
>> Best,
>> Shammon FY
>>
>> On Mon, Jul 3, 2023 at 2:43 PM Xiaolong Wang 
>> wrote:
>>
>>> Hi,
>>> I've tested the Flink SQL-gateway to run some simple Hive queries but
>>> met some exceptions.
>>>
>>>
>>> Environment Description:
>>> Run on : Kubernetes
>>> Deployment Mode: Session Mode (created by a flink-kubernetes-operator)
>>> Steps to run:
>>> 1. Apply a `flinkdeployment` of flink session cluster to flink operator
>>> ```
>>> apiVersion: flink.apache.org/v1beta1
>>> kind: FlinkDeployment
>>> metadata:
>>>   name: flink-session-cluster-example
>>>   namespace: xxx
>>> spec:
>>>   image: xxx/flink:1.17-sql-gateway-dev
>>>   flinkVersion: v1_17
>>>   flinkConfiguration:
>>> taskmanager.numberOfTaskSlots: "2"
>>> pipeline.max-parallelism: "1000"
>>> state.backend.type: rocksdb
>>> state.backend.incremental: "true"
>>> state.checkpoints.dir: xxx
>>> execution.checkpointing.interval: 1m
>>> execution.checkpointing.timeout: 30m
>>> high-availability:
>>> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>>> high-availability.storageDir: xxx
>>> akka.framesize: 20971520b
>>> execution.checkpointing.externalized-checkpoint-retention:
>>> RETAIN_ON_CANCELLATION
>>> taskmanager.memory.managed.fraction: "0.2"
>>> kubernetes.hadoop.conf.config-map.name: xxx
>>>   serviceAccount: default
>>>   podTemplate:
>>> apiVersion: v1
>>> kind: Pod
>>> metadata:
>>>   name: pod-template
>>> spec:
>>>   serviceAccount: default
>>>   jobManager:
>>> resource:
>>>   memory: "2048m"
>>>   cpu: 1
>>>   taskManager:
>>> resource:
>>>   memory: "4096m"
>>>   cpu: 1
>>> ```
>>> This image has been built with a `hadoop dependency` , an existing
>>> `hadoop configmap`.
>>>
>>> 2. Login to the job-manager pod and run the followings
>>> `./bin/sql-gateway.sh start-foreground
>>> -Dsql-gateway.endpoint.type=hiveserver2
>>> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf`
>>>
>>> 3. Start a beeline and connect to the SQL gateway then run a simple Hive
>>> query
>>> `select count(1) from simple_demo_output where dt = '2021-08-14';`
>>>
>>> 4.The SQL gateway goes wrong with the following logs:
>>> ```
>>>
>>> 2023-07-03 06:27:11,078 INFO  
>>> org.apache.flink.client.program.rest.RestClusterClient
>>>   [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5).
>>>
>>> 2023-07-03 06:27:15,092 INFO  
>>> org.apache.flink.client.program.rest.RestClusterClient
>>>   [] - Successfully submitted job 'collect'
>>> (4c99c40392cb935d3df94891655d2ce5) to '
>>> http://flink-session-cluster-example-rest.realtime-streaming:8081'.
>>>
>>> 2023-07-03 06:27:15,879 ERROR
>>> org.apache.flink.table.gateway.service.operation.OperationManager [] -
>>> Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee.
>>>
>>> org.apache.flink.table.api.TableException: Failed to execute sql
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
>>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>>>
>>> at
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
>>> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>>>
>>> at
>>> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
>>> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>

Re: PartitionNotFoundException循环重启

2023-07-03 Thread Shammon FY
Hi,

PartitionNotFoundException异常原因通常是下游task向上游task发送partition
request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。

Best,
Shammon FY

On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

>
> 异常日志内容
>
> 2023-07-03 20:30:15,164 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink:
> Sink 3 (2/45)
> (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> 3093 not found.
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
>
>
>
> 发件人: zhan...@eastcom-sw.com
> 发送时间: 2023-07-04 09:25
> 收件人: user-zh
> 主题: PartitionNotFoundException循环重启
> hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> PartitionNotFoundException 的异常,然后不断的循环重启
>
> 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> 的异常后,不断的重启
> taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> taskmanager.network.max-num-tcp-connections: 16
>
> 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
>
>


Re: SQL-gateway Failed to Run

2023-07-03 Thread Shammon FY
Hi Xiaolong,

>From the exception it seems that the flink session cluster is not
running properly. Can you visit the flink web ui and everything is ok?

Best,
Shammon FY

On Mon, Jul 3, 2023 at 2:43 PM Xiaolong Wang 
wrote:

> Hi,
> I've tested the Flink SQL-gateway to run some simple Hive queries but met
> some exceptions.
>
>
> Environment Description:
> Run on : Kubernetes
> Deployment Mode: Session Mode (created by a flink-kubernetes-operator)
> Steps to run:
> 1. Apply a `flinkdeployment` of flink session cluster to flink operator
> ```
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: flink-session-cluster-example
>   namespace: xxx
> spec:
>   image: xxx/flink:1.17-sql-gateway-dev
>   flinkVersion: v1_17
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> pipeline.max-parallelism: "1000"
> state.backend.type: rocksdb
> state.backend.incremental: "true"
> state.checkpoints.dir: xxx
> execution.checkpointing.interval: 1m
> execution.checkpointing.timeout: 30m
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: xxx
> akka.framesize: 20971520b
> execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION
> taskmanager.memory.managed.fraction: "0.2"
> kubernetes.hadoop.conf.config-map.name: xxx
>   serviceAccount: default
>   podTemplate:
> apiVersion: v1
> kind: Pod
> metadata:
>   name: pod-template
> spec:
>   serviceAccount: default
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "4096m"
>   cpu: 1
> ```
> This image has been built with a `hadoop dependency` , an existing `hadoop
> configmap`.
>
> 2. Login to the job-manager pod and run the followings
> `./bin/sql-gateway.sh start-foreground
> -Dsql-gateway.endpoint.type=hiveserver2
> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf`
>
> 3. Start a beeline and connect to the SQL gateway then run a simple Hive
> query
> `select count(1) from simple_demo_output where dt = '2021-08-14';`
>
> 4.The SQL gateway goes wrong with the following logs:
> ```
>
> 2023-07-03 06:27:11,078 INFO  
> org.apache.flink.client.program.rest.RestClusterClient
>   [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5).
>
> 2023-07-03 06:27:15,092 INFO  
> org.apache.flink.client.program.rest.RestClusterClient
>   [] - Successfully submitted job 'collect'
> (4c99c40392cb935d3df94891655d2ce5) to '
> http://flink-session-cluster-example-rest.realtime-streaming:8081'.
>
> 2023-07-03 06:27:15,879 ERROR
> org.apache.flink.table.gateway.service.operation.OperationManager [] -
> Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee.
>
> org.apache.flink.table.api.TableException: Failed to execute sql
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> [?:?]
>
> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> [?:?]
>
> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
>
> at java.

Re: Checkpointed data size is zero

2023-07-03 Thread Shammon FY
Hi Kamal,

You can check whether flink job has readed data from source in flink web
ui, it will show total record count and size for each operator.

Best,
Shammon FY

On Sat, Jul 1, 2023 at 4:53 PM Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> I have a requirement to read data coming over TCP socket stream and for
> the same written one custom source function reading data by TCP socket.
>
>
>
> Job is running successfully but in flink dashboard checkpoint overview,
> checkpointed data size is 0.
>
>
>
> Can you please help if there is anything need to check or some
> issue/limitation due to TCP streaming?
>
>
>
> Rgds,
>
> Kamal
>


Re: Query around Rocksdb

2023-07-03 Thread Shammon FY
Hi neha,

Which flink version are you using? We have also encountered the issue of
continuous growth of off-heap memory in the TM of the session cluster
before, the reason is that the memory fragments cannot be reused like issue
[1]. You can check the memory allocator and try to use jemalloc instead
refer to doc [2] and [3].

[1] https://issues.apache.org/jira/browse/FLINK-19125
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator

Best,
Shammon FY

On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:

> Hello,
>
> I am trying to debug the unbounded memory consumption by the Flink
> process. The heap size of the process remains the same. The size of the RSS
> of the process keeps on increasing. I suspect it might be because of
> RocksDB.
>
> we have the default value for state.backend.rocksdb.memory.managed as
> true. Can anyone confirm that this config will Rockdb be able to take the
> unbounded native memory?
>
> If yes, what metrics can I check to confirm the issue? Any help would be
> appreciated.
>


Re: Data & Task distribution among the available Nodes

2023-06-30 Thread Shammon FY
Hi Mahmoud,

For the third quest, currently flink uses Fine-Grained Resource Management
to choose a TM for tasks, you can refer to the doc [1] for more information.


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/

Best,
Shammon FY


On Thu, Jun 29, 2023 at 4:17 PM Martijn Visser 
wrote:

> Hi Mahmoud,
>
> While it's not an answer to your questions, I do want to point out
> that the DataSet API is deprecated and will be removed in a future
> version of Flink. I would recommend moving to either the Table API or
> the DataStream API.
>
> Best regards,
>
> Martijn
>
> On Thu, Jun 22, 2023 at 6:14 PM Mahmoud Awad 
> wrote:
> >
> > Hello everyone,
> >
> > I am trying to understand the mechanism by which Flink distributed the
> data and the tasks among the nodes/task managers in the cluster, assuming
> all TMs have equal resources. I am using the DataSet API on my own machine.
> > I will try to address the issue with the following questions :
> >
> > -When we  firstly read the data from the source(Text,CSV..etc.), How
> does Flink ensures the fairly distribution of data from the source to the
> next subtask ?
> >
> > -Are there any preferences by which Flink will prefer a task manager on
> the other(assuming all task managers have equal resources) ?
> >
> > - Based on what, will Flink choose to deploy a specific task in a
> specific task manager ?
> >
> > I hope I was able to explain my point, thank you in advanced.
> >
> > Best regards
> > Mahmoud
> >
> >
> >
> > Gesendet von Mail für Windows
> >
> >
>


Re: 关于flink批计算

2023-06-30 Thread Shammon FY
Hi

可以的,DataStream有很多内置的数据类型,也支持自定义数据类型和数据的序列化反序列化,然后在DataStream的计算内对数据执行计算,可以参考DataStream的官方文档[1]和数据类型文档[2]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/overview/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/serialization/types_serialization/

Best,
Shammon FY

On Fri, Jun 30, 2023 at 10:34 AM Liu Join  wrote:

> 请教下,如果用flink进行批计算,使用DataStream API有没有什么优化的地方,是否可以直接将数据作为矩阵在算子之间传递进行计算
>


Re: Very long launch of the Flink application in BATCH mode

2023-06-27 Thread Shammon FY
Hi Brendan,

I think you may need to confirm which stage the job is blocked, the client
is submitting job or resourcemanage is scheduling job or tasks are
launching in TM? May be you need provide more information to help us to
figure the issue

Best,
Shammon FY

On Tuesday, June 27, 2023, Weihua Hu  wrote:

> Hi, Brendan
>
> It looks like it's invoking your main method referring to the log. You can
> add more logs in the main method to figure out which part takes too long.
>
> Best,
> Weihua
>
>
> On Tue, Jun 27, 2023 at 5:06 AM Brendan Cortez <
> brendan.cortez...@gmail.com> wrote:
>
>> No, I'm using a collection source + 20 same JDBC lookups + Kafka sink.
>>
>> On Mon, 26 Jun 2023 at 19:17, Yaroslav Tkachenko 
>> wrote:
>>
>>> Hey Brendan,
>>>
>>> Do you use a file source by any chance?
>>>
>>> On Mon, Jun 26, 2023 at 4:31 AM Brendan Cortez <
>>> brendan.cortez...@gmail.com> wrote:
>>>
>>>> Hi all!
>>>>
>>>> I'm trying to submit a Flink Job in Application Mode in the Kubernetes
>>>> cluster.
>>>>
>>>> I see some problems when an application has a big number of operators
>>>> (more than 20 same operators) - it freezes for ~6 minutes after
>>>> *2023-06-21 15:46:45,082 WARN
>>>>  org.apache.flink.connector.kafka.sink.KafkaSinkBuilder   [] - Property
>>>> [transaction.timeout.ms <http://transaction.timeout.ms/>] not specified.
>>>> Setting it to PT1H*
>>>>  and until
>>>>
>>>> *2023-06-21 15:53:20,002 INFO
>>>>  org.apache.flink.streaming.api.graph.StreamGraphGenerator[] - Disabled
>>>> Checkpointing. Checkpointing is not supported and not needed when executing
>>>> jobs in BATCH mode.*(logs in attachment)
>>>>
>>>> When I set log.level=DEBUG, I see only this message each 10 seconds:
>>>> *2023-06-21 14:51:30,921 DEBUG
>>>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
>>>> Trigger heartbeat request.*
>>>>
>>>> Please, could you help me understand the cause of this problem and how
>>>> to fix it. I use the Flink 1.15.3 version.
>>>>
>>>> Thank you in advance!
>>>>
>>>> Best regards,
>>>> Brendan Cortez.
>>>>
>>>


Re: Query on Flink SQL primary key for nested field

2023-06-24 Thread Shammon FY
Hi elakiya,

I think you may need to spread the columns in key and value format, then
you can use the specific column as a primary key in the ddl.

Best,
Shammon FY

On Fri, Jun 23, 2023 at 6:36 PM elakiya udhayanan 
wrote:

> Hi team,
>
> I have a Kafka topic named employee which uses confluent avro schema and
> will emit the payload as below:
>
> {
> "employee": {
> "id": "123456",
> "name": "sampleName"
> }
> }
> I am using the upsert-kafka connector to consume the events from the above
> Kafka topic as below using the Flink SQL DDL statement, also here I want to
> use the id field as the Primary key. But I am unable to use the id field
> since it is inside the object.
>
> DDL Statement:
> String statement = "CREATE TABLE Employee (\r\n" +
> "  employee  ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (employee.id) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
> Any help is appreciated TIA
>
> Thanks,
> Elakiya
>


Re: Task Slot Resource Allocation

2023-06-20 Thread Shammon FY
Hi Pritam,

Slots in TM focus more on logical management for cores, this means that TM
could not assign physical cores to each slot. RM will manage the core usage
of each TM, such as TM1 using 2 cores and TM2 using 3 cores. According to
the resource usage required by the task (also logical and estimated), RM
allocates slots from TM for the task. During the execution phase, whether
the slot really uses so many cores, or not, or exceeds the allocated
quantity, TM will not control it.

Currently, `Fine-Grained Resource Managemeng` should be the default
resource management on the main branch of Flink, you can refer to doc[1]
for more information.

Returning to your question, if you find a bottleneck in the TM CPU usage of
the job, you can try increasing the number of TM cores, which will improve
job performance.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/finegrained_resource/

Best,
Shammon FY

On Wed, Jun 21, 2023 at 3:18 AM Pritam Agarwala 
wrote:

> Hi Team,
>
> I have a confusion about how TM's core will be used for task slots.
>
> I have TMs with 3 cores and 4 task slots . Each task slot will be using
> one core to perform a task and one task slot has to wait to get resources ?
> If I increase the cores to 4 , same as task slots per TM will it help to
> improve performance ?? Is there any thumb rule ?
>
>
> Thanks & Regards,
> Pritam
>


Re: Flink Checkpoint times out with checkpointed data size doubles every checkpoint.

2023-06-20 Thread Shammon FY
Hi Prabhu,

I found that the size of `Full Checkpoint Data Size` is equal to
`Checkpointed Data Size`. So what's the state backend you are using? I
recommend you to use rocksdb state backed for your job, and if so, you can
turn on incremental checkpoint [1] which will reduce the state size for the
checkpoint.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#incremental-checkpoints

Best,
Shammon FY

On Tue, Jun 20, 2023 at 4:50 PM Alex Nitavsky 
wrote:

> Hello Prabhu,
>
> On your place I would check:
>
> 1. That there is no "state leak" in your job, because it seems that state
> only accumulates for the job and is never cleaned, e.g. probably some timer
> which cleans the state for some key is not configured correctly.
>
> 2. Probably you accumulate the state in a big window, e.g. in a 2 hour
> Tumbling window the maximum job state will be reached in two hours only. So
> your job should be scaled or optimized.
>
> Best
> Alex
>
> On Tue, Jun 20, 2023 at 10:39 AM Prabhu Joseph 
> wrote:
>
>> Hi,
>>
>> Flink Checkpoint times out with checkpointed data size doubles every
>> checkpoint. Any ideas on what could be wrong in the application or how to
>> debug this?
>>
>> [image: checkpoint_issue.png]
>>
>>
>>


Re: Flink1.14 需求超大内存

2023-06-19 Thread Shammon FY
Hi,

这个doris的sink是你自己实现的还是flink或者doris官方提供的?从错误来看,像是sink节点申请了超大的内存资源,你可以确认一下是否有问题,或者是否有配置项可以配置

Best,
Shammon FY

On Mon, Jun 19, 2023 at 4:19 PM 郭欣瑞  wrote:

> 我在ide里测试一个任务的时候,任务一直处于created状态,过了很久之后报了以下的错
>
> DeclarativeSlotPoolBridge.java:351  - Could not acquire the minimum
> required resources, failing slot requests. Acquired:
> [ResourceRequirement{resourceProfile=ResourceProfile{taskHeapMemory=1024.000gb
> (1099511627776 bytes), taskOffHeapMemory=1024.000gb (1099511627776 bytes),
> managedMemory=128.000mb (134217728 bytes), networkMemory=64.000mb (67108864
> bytes)}, numberOfRequiredSlots=1}]. Current slot pool status: Registered
> TMs: 1, registered slots: 1 free slots: 0
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not acquire the minimum required resources.
>
> 我排查了一下发现最可疑的是用了一个doris的rowdata streamload sink,将其注释换了一个写入本地文件的sink就可以正常运行了
> 这是我doris sink的代码,flink doris connector版本是1.1.1
> DorisSink.Builder builder = DorisSink.builder();
> DorisOptions.Builder dorisBuilder = DorisOptions.builder();
> dorisBuilder.setFenodes(parameterTool.get("doris.FE_IP"))
>
> .setTableIdentifier(parameterTool.get("doris.sfinx_database")+"."+parameterTool.get("doris.table.asset_tag_data","asset_tag_data"))
> .setUsername(parameterTool.get("doris.user"))
> .setPassword(parameterTool.get("doris.password"));
>
> Properties pro = new Properties();
> pro.setProperty("format", "json");
> pro.setProperty("read_json_by_line", "true");
>
> Date date = new Date();
> DorisExecutionOptions.Builder executionBuilder =
> DorisExecutionOptions.builder();
>
> executionBuilder.setLabelPrefix("FundCategoryFilter-doris"+date.getTime()).setStreamLoadProp(pro);
>
> String[] fields =
> {"uid","subject","trade_date","update_time","value"};
> DataType[] types =
> {DataTypes.VARCHAR(36),DataTypes.VARCHAR(20),DataTypes.DATE(),DataTypes.TIMESTAMP(),DataTypes.DOUBLE()};
>
> builder.setDorisReadOptions(DorisReadOptions.builder().build())
> .setDorisExecutionOptions(executionBuilder.build())
>
> .setSerializer(RowDataSerializer.builder().setFieldNames(fields).setType("json").setFieldType(types).build())
> .setDorisOptions(dorisBuilder.build());
> fundCategoryDataStream.sinkTo(builder.build())
>
> .slotSharingGroup(parameterTool.get("fund_category_data_sink_group",
> "fund_category_sink"))
>
> .setParallelism(parameterTool.getInt("base_data_sink_parallelism", 1))
>
> .uid(parameterTool.get("fundCategroyDataSinkID","fundCategroyDataSinkID_1"))
> .name("fundCategorySinkName”);
>
>
>


Re: Store a state at a RDBMS before TTL passes by

2023-06-15 Thread Shammon FY
Hi Anastasios,

What you want sounds like a session window [1], maybe you can refer to the
doc for more details.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#session-windows

Best,
Shammon FY

On Thu, Jun 15, 2023 at 10:03 PM Anastasios Makris <
anastasios.makris...@gmail.com> wrote:

> Hi Flink users,
>
> I created a KeyedStream that tracks for each user of my website some
> metrics. It's time a user produces an event the metrics are recomputed and
> change.
> I would like to keep the outcome of a user's session at an RDBMS, which
> will be a single row.
>
> The first and obvious solution would be to Insert the row at the RDBMS and
> then update it, it's time something new occurs.
>
> I would like to ask if another solution is possible.
> For example, could we maybe schedule to use the RDBMS as a sink of the
> state before its TTL passes by?
>
> Best regards,
> Anastasis
>


Re: Flink bulk and record file source format metrices

2023-06-14 Thread Shammon FY
Hi Kamal,

Can you give more information about the metris you want? In Flink each
source task has one source reader which already has some metrics, you can
refer to metrics doc[1] for more detailed information.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/

Best,
Shammon FY

On Tue, Jun 13, 2023 at 11:13 AM Kamal Mittal via user <
user@flink.apache.org> wrote:

> Hello,
>
>
>
> Using Flink record stream format file source API as below for parquet
> records reading.
>
>
>
> FileSource.FileSourceBuilder source = FileSource.
> *forRecordStreamFormat*(streamformat, path);
>
> source.monitorContinuously(Duration.*ofMillis*(1));
>
>
>
> Want to log/generate metrices for corrupt records and for the same need to
> log flink metrices at *source level* in parquet reader class, is there
> any way to do that as right now no handle for SourceContext available?
>
>
>
> Rgds,
>
> Kamal
>


Re: Building Dynamic SQL using contents of Map datastructure

2023-06-12 Thread Shammon FY
Hi Yogesh,

I think you need to build the dynamic SQL statement in your service and
then submit the SQL to flink cluster.

Best,
Shammon FY

On Mon, Jun 12, 2023 at 9:15 PM Yogesh Rao  wrote:

> Hi,
>
> Is there a way we can build a dynamic SQL in Flink from contents of Map ?
>
> Essentially trying to do achieve something like below
>
> StringBuilder builder = new StringBuilder("INSERT INTO sampleSink SELECT "
> );
>
> builder.append("getColumnsFromMap(dataMap), ");
>
> builder.append(" FROM Data").toString();
>
>
> Here getColumnsFromMap is registered as ScalarFunction which would return
> array of Strings basically column names.
>
>
> Regards,
>
> -Yogesh
>


Re: Kubernetes operator listing jobs TimeoutException

2023-06-07 Thread Shammon FY
Hi Evgeniy,

>From the following exception message:

at
org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
at
org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:469)
at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:392)
at
org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:306)
at
org.apache.flink.client.program.rest.RestClusterClient.lambda$null$37(RestClusterClient.java:931)
at
java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)

It seems that the client tried to submit a job to the flink cluster through
the rest api failed, maybe you need to provide more information such as
config of k8s for the job and community can help better analyze problems.


Best,
Shammon FY

On Wed, Jun 7, 2023 at 11:35 PM Evgeniy Lyutikov 
wrote:

> Hello.
> We use Kubernetes operator 1.4.0, operator serves about 50 jobs, but
> sometimes there are errors in the logs that are reflected in the metrics
> (FlinkDeployment.JmDeploymentStatus.READY.Count). What is the reason for
> such errors?
>
>
> 2023-06-07 15:28:27,601 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][job-name/job-name] Starting reconciliation
> 2023-06-07 15:28:27,602 o.a.f.k.o.s.FlinkResourceContextFactory [INFO
> ][job-name/job-name] Getting service for job-name
> 2023-06-07 15:28:27,602 o.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-name/job-name] Observing job status
> 2023-06-07 15:28:39,623 o.a.f.s.n.i.n.c.AbstractChannel [WARN ]
> Force-closing a channel whose registration task was not accepted by an
> event loop: [id: 0xd494f516]
> java.util.concurrent.RejectedExecutionException: event executor terminated
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825)
> at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81)
> at
> org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139)
> at
> org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:123)
> at
> org.apache.flink.runtime.rest.RestClient.submitRequest(RestClient.java:469)
> at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:392)
> at
> org.apache.flink.runtime.rest.RestClient.sendRequest(RestClient.java:306)
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$null$37(RestClusterClient.java:931)
> at
> java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:649)
> at
> java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> 2023-06-07 15:28:39,624 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR]
> Failed to submit a listener notification task. Event loop shut down?
> jav

Re: pyflink1.17 中文乱码

2023-06-07 Thread Shammon FY
Hi,

你是怎么运行的?是不是中文的文件编码格式不对?

Best,
Shammon FY


On Thu, Jun 8, 2023 at 10:07 AM yidan zhao  wrote:

> 可以描述再详细点
>
> 1  于2023年6月7日周三 19:55写道:
> >
> > 老师们好,pyflink运行官网例子 wordcount 。把单词改成中文 乱码
> >
> >
> >
> >
> >
>


Re: WELCOME to user@flink.apache.org

2023-06-05 Thread Shammon FY
Hi,

Thanks Li Shao, I got it. You can analyze the specific memory usage of
metaspace. In fact, we have also encountered the same problem of running
batch jobs in the session cluster, which resulted in metaspace growth due
to the classloader. I have created a PR [1] for FLIP-32265 [2] for this and
try to fix this issue.

[1] https://github.com/apache/flink/pull/22718
[2] https://issues.apache.org/jira/browse/FLINK-32265

Best,
Shammon FY


On Tue, Jun 6, 2023 at 8:58 AM Li Shao  wrote:

> Hi Shammon,
>
> Thank you for your reply. My flink job is using batch mode. For streaming
> mode I never see the increasing metaspace.
>
>
> On Mon, Jun 5, 2023 at 5:55 PM Shammon FY  wrote:
>
>> Hi Li Shao,
>>
>> Currently Flink will create a user classloader in JobManager for each job
>> which can only be released by FullGC, I think this is why JVM metaspace is
>> increasing, you can check it.
>> Are you using session mode? I have a small question: Is your job SQL only
>> without UDF or DataStream? Thanks
>>
>> Best,
>> Shammon FY
>>
>> On Tue, Jun 6, 2023 at 4:27 AM Li Shao  wrote:
>>
>>> Hi,
>>>
>>> Recently I noticed my job manager JVM metaspace is keeping increasing
>>> for running batch flink jobs. I found similar stackoverflow post:
>>> https://stackoverflow.com/questions/73184042/apache-flink-job-manager-node-runs-out-of-jvm-metaspace-quickly,
>>> but there is no solution on this. I am wondering if flink can clean up the
>>> job manager JVM metaspace periodically or it does not. Please suggest.
>>>
>>> Thanks,
>>> Li
>>>
>>> Version: 1.14.4 Flink HA mode
>>> JVM Metaspace: 1.88 GB / 2.00 GB
>>>
>>> JVM (Heap/Non-Heap) Memory
>>> TypeCommittedUsedMaximum
>>> Heap 6.00 GB 3.79 GB 6.00 GB
>>> Non-Heap 2.34 GB 2.25 GB 3.23 GB
>>> Outside JVM Memory
>>> TypeCountUsedCapacity
>>> Direct 927 86.9 MB 87.0 MB
>>> Mapped 0 0 B 0 B
>>> Garbage Collection
>>> CollectorCountTime
>>> G1_Young_Generation 1355 57139
>>> G1_Old_Generation 1 1325
>>>
>>> On Mon, Jun 5, 2023 at 1:21 PM  wrote:
>>>
>>>> Hi! This is the ezmlm program. I'm managing the
>>>> user@flink.apache.org mailing list.
>>>>
>>>> Acknowledgment: I have added the address
>>>>
>>>>lsgreat12...@gmail.com
>>>>
>>>> to the user mailing list.
>>>>
>>>> Welcome to user@flink.apache.org!
>>>>
>>>> Please save this message so that you know the address you are
>>>> subscribed under, in case you later want to unsubscribe or change your
>>>> subscription address.
>>>>
>>>>
>>>> --- Administrative commands for the user list ---
>>>>
>>>> I can handle administrative requests automatically. Please
>>>> do not send them to the list address! Instead, send
>>>> your message to the correct command address:
>>>>
>>>> To subscribe to the list, send a message to:
>>>>
>>>>
>>>> To remove your address from the list, send a message to:
>>>>
>>>>
>>>> Send mail to the following for info and FAQ for this list:
>>>>
>>>>
>>>>
>>>> Similar addresses exist for the digest list:
>>>>
>>>>
>>>>
>>>> To get messages 123 through 145 (a maximum of 100 per request), mail:
>>>>
>>>>
>>>> To get an index with subject and author for messages 123-456 , mail:
>>>>
>>>>
>>>> They are always returned as sets of 100, max 2000 per request,
>>>> so you'll actually get 100-499.
>>>>
>>>> To receive all messages with the same subject as message 12345,
>>>> send a short message to:
>>>>
>>>>
>>>> The messages should contain one line or word of text to avoid being
>>>> treated as sp@m, but I will ignore their content.
>>>> Only the ADDRESS you send to is important.
>>>>
>>>> You can start a subscription for an alternate address,
>>>> for example "john@host.domain", just add a hyphen and your
>>>> address (with '=' instead of '@') after the command word:
>>>> 
>>>>
>>>> To stop subscription for this address, mail:
>>>> 
>>>>
>>>> In both cases, I'll send a confirmation messa

Re: WELCOME to user@flink.apache.org

2023-06-05 Thread Shammon FY
Hi Li Shao,

Currently Flink will create a user classloader in JobManager for each job
which can only be released by FullGC, I think this is why JVM metaspace is
increasing, you can check it.
Are you using session mode? I have a small question: Is your job SQL only
without UDF or DataStream? Thanks

Best,
Shammon FY

On Tue, Jun 6, 2023 at 4:27 AM Li Shao  wrote:

> Hi,
>
> Recently I noticed my job manager JVM metaspace is keeping increasing for
> running batch flink jobs. I found similar stackoverflow post:
> https://stackoverflow.com/questions/73184042/apache-flink-job-manager-node-runs-out-of-jvm-metaspace-quickly,
> but there is no solution on this. I am wondering if flink can clean up the
> job manager JVM metaspace periodically or it does not. Please suggest.
>
> Thanks,
> Li
>
> Version: 1.14.4 Flink HA mode
> JVM Metaspace: 1.88 GB / 2.00 GB
>
> JVM (Heap/Non-Heap) Memory
> TypeCommittedUsedMaximum
> Heap 6.00 GB 3.79 GB 6.00 GB
> Non-Heap 2.34 GB 2.25 GB 3.23 GB
> Outside JVM Memory
> TypeCountUsedCapacity
> Direct 927 86.9 MB 87.0 MB
> Mapped 0 0 B 0 B
> Garbage Collection
> CollectorCountTime
> G1_Young_Generation 1355 57139
> G1_Old_Generation 1 1325
>
> On Mon, Jun 5, 2023 at 1:21 PM  wrote:
>
>> Hi! This is the ezmlm program. I'm managing the
>> user@flink.apache.org mailing list.
>>
>> Acknowledgment: I have added the address
>>
>>lsgreat12...@gmail.com
>>
>> to the user mailing list.
>>
>> Welcome to user@flink.apache.org!
>>
>> Please save this message so that you know the address you are
>> subscribed under, in case you later want to unsubscribe or change your
>> subscription address.
>>
>>
>> --- Administrative commands for the user list ---
>>
>> I can handle administrative requests automatically. Please
>> do not send them to the list address! Instead, send
>> your message to the correct command address:
>>
>> To subscribe to the list, send a message to:
>>
>>
>> To remove your address from the list, send a message to:
>>
>>
>> Send mail to the following for info and FAQ for this list:
>>
>>
>>
>> Similar addresses exist for the digest list:
>>
>>
>>
>> To get messages 123 through 145 (a maximum of 100 per request), mail:
>>
>>
>> To get an index with subject and author for messages 123-456 , mail:
>>
>>
>> They are always returned as sets of 100, max 2000 per request,
>> so you'll actually get 100-499.
>>
>> To receive all messages with the same subject as message 12345,
>> send a short message to:
>>
>>
>> The messages should contain one line or word of text to avoid being
>> treated as sp@m, but I will ignore their content.
>> Only the ADDRESS you send to is important.
>>
>> You can start a subscription for an alternate address,
>> for example "john@host.domain", just add a hyphen and your
>> address (with '=' instead of '@') after the command word:
>> 
>>
>> To stop subscription for this address, mail:
>> 
>>
>> In both cases, I'll send a confirmation message to that address. When
>> you receive it, simply reply to it to complete your subscription.
>>
>> If despite following these instructions, you do not get the
>> desired results, please contact my owner at
>> user-ow...@flink.apache.org. Please be patient, my owner is a
>> lot slower than I am ;-)
>>
>> --- Enclosed is a copy of the request I received.
>>
>> Return-Path: 
>> Received: (qmail 1410866 invoked by uid 116); 5 Jun 2023 20:20:57 -
>> Received: from spamproc1-he-de.apache.org (HELO
>> spamproc1-he-de.apache.org) (116.203.196.100)
>>  by apache.org (qpsmtpd/0.94) with ESMTP; Mon, 05 Jun 2023 20:20:57 +
>> Authentication-Results: apache.org; auth=none
>> Received: from localhost (localhost [127.0.0.1])
>> by spamproc1-he-de.apache.org (ASF Mail Server at
>> spamproc1-he-de.apache.org) with ESMTP id 5CD4B1FF748
>> for > gmail@flink.apache.org>; Mon,  5 Jun 2023 20:20:57 + (UTC)
>> X-Virus-Scanned: Debian amavisd-new at spamproc1-he-de.apache.org
>> X-Spam-Flag: NO
>> X-Spam-Score: 0.24
>> X-Spam-Level:
>> X-Spam-Status: No, score=0.24 tagged_above=-999 required=6.31
>> tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
>> DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25,
>> HTML_MESSAGE=0.2,
>> RCVD_IN_DNSWL_BLOCKED=0.001, RCVD_IN_MSPIKE_H2=-0.001,
>> SPF_PASS=-

Re: Bulk storage of protobuf records in files

2023-06-05 Thread Shammon FY
Hi Ryan,

What I usually encounter is writing Protobuf format data to systems such as
Kafka, and I have never encountered writing to a file yet.

Best,
Shammon FY


On Mon, Jun 5, 2023 at 10:50 PM Martijn Visser 
wrote:

> Hey Ryan,
>
> I've never encountered a use case for writing Protobuf encoded files to a
> filesystem.
>
> Best regards,
>
> Martijn
>
> On Fri, May 26, 2023 at 6:39 PM Ryan Skraba via user <
> user@flink.apache.org> wrote:
>
>> Hello all!
>>
>> I discovered while investigating FLINK-32008[1] that we can write to the
>> filesystem connector with the protobuf format, but today, the resulting
>> file is pretty unlikely to be useful or rereadable.
>>
>> There's no real standard for storing many protobuf messages in a single
>> file container, although the documentation mentions writing size-delimited
>> messages sequentially[2].  In practice, I've never encountered protobuf
>> binaries stored on filesystems without using some other sort of "framing"
>> (like how parquet can be accessed with either an Avro or a protobuf
>> oriented API).
>>
>> Does anyone have any use cases for bulk storage of protobuf messages on a
>> filesystem?  Should these files just be considered temporary storage for
>> Flink jobs, or do they need to be compatible with other systems?  Is there
>> a splittable / compressable file format?
>>
>> The alternative might be to just forbid file storage for protobuf
>> messages!  Any opinions?
>>
>> All my best, Ryan Skraba
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-32008
>> [2]: https://protobuf.dev/programming-guides/techniques/#streaming
>>
>


Re: 退订

2023-06-04 Thread Shammon FY
退订请发送任意邮件到 user-zh-unsubscr...@flink.apache.org,可以参考
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

On Sun, Jun 4, 2023 at 11:13 PM 张保淇  wrote:

> 退订
>
>
> 麻烦尽快帮忙处理
>
>
> Best wishes
> 张保淇
> 电话:+8618878478770
> 邮件:hzuzhangba...@163.com


Re: flink 输出异常数据

2023-05-31 Thread Shammon FY
Hi

可以看一下报空指针的具体异常栈,如果是你的业务代码,可以在你的处理逻辑里加上一些判断信息并打印到日志文件;如果不是你的业务代码,可以贴一下具体的异常栈信息。

On Wed, May 31, 2023 at 12:31 PM yidan zhao  wrote:

> 这个得靠你自己打日志吧,在可能出NPE的地方 try catch 到,然后打印原始记录。
>
> 小昌同学  于2023年5月29日周一 18:30写道:
> >
> > 你好,数据源是kafka,使用的是stream api
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
> >  回复的原邮件 
> > | 发件人 | Weihua Hu |
> > | 发送日期 | 2023年5月29日 15:29 |
> > | 收件人 |  |
> > | 主题 | Re: flink 输出异常数据 |
> > Hi,
> >
> > 你使用的数据源是什么呢?Kafka 吗?用的是 FlinkSQL 还是 DataStream API 呢?
> >
> > 方便把异常栈贴一下吗
> >
> > Best,
> > Weihua
> >
> >
> > On Mon, May 29, 2023 at 1:36 PM 小昌同学  wrote:
> >
> >
> >
> 各位老师,我有一个作业运行很久了,但是最近源系统有一些脏数据导致作业运行失败,看yarn的日志报错是空指针,但是我现在想把那一条脏数据捕获到,请问一下有啥办法吗?谢谢各位老师的指导
> >
> >
> > | |
> > 小昌同学
> > |
> > |
> > ccc0606fight...@163.com
> > |
>


Re: flink web ui显示问题

2023-05-30 Thread Shammon FY
Hi,

好像没有收到附件或者文档,你可以检查确认一下

Best,
Shammon FY

On Wed, May 31, 2023 at 9:52 AM 小昌同学  wrote:

> 各位老师好,请教一个关于flink web ui的显示问题;
> 具体的显示异常截图的我以附件的形式放在文档中,我的疑惑是web
> ui上面已经显示watermark,但是看detail的时候显示不是watermark;
> 感谢各位老师指导
>
> 小昌同学
> ccc0606fight...@163.com
>
> <https://dashi.163.com/projects/signature-manager/detail/index.html?ftlId=1=%E5%B0%8F%E6%98%8C%E5%90%8C%E5%AD%A6=ccc0606fighting%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fsmed9b2013afa816e025ae07760d572391.jpg=%5B%22ccc0606fighting%40163.com%22%5D>
>


Re: Backpressure handling in FileSource APIs - Flink 1.16

2023-05-28 Thread Shammon FY
Hi Kamal,

The network buffer will be full for specific `FileSource` when the job has
back pressure which will block the source subtask. You can refer to network
buffer [1] for more information.

[1]
https://flink.apache.org/2019/06/05/a-deep-dive-into-flinks-network-stack/

Best,
Shammon FY


On Fri, May 26, 2023 at 7:13 PM Kamal Mittal  wrote:

> Hello Shammon,
>
> Can you please point out the classes where like for "FileSource" slow down
> logic is placed?
>
> Just wanted to understand it more better and try it at my end by running
> various perf. runs, also apply changes in my application if any.
>
> Rgds,
> Kamal
>
> On Thu, May 25, 2023 at 9:16 AM Kamal Mittal  wrote:
>
>> Hello Shammon,
>>
>> Can you please point out the classes where like for "FileSource" slow
>> down logic is placed?
>>
>> Just wanted to understand it more better and try it at my end by running
>> various perf. runs, also apply changes in my application if any.
>>
>> Rgds,
>> Kamal
>>
>> On Wed, May 24, 2023 at 11:41 AM Kamal Mittal  wrote:
>>
>>> Thanks Shammon for clarification.
>>>
>>> On Wed, May 24, 2023 at 11:01 AM Shammon FY  wrote:
>>>
>>>> Hi Kamal,
>>>>
>>>> The source will slow down when there is backpressure in the flink job,
>>>> you can refer to docs [1] and [2] to get more detailed information about
>>>> backpressure mechanism.
>>>>
>>>> Currently there's no API or Callback in source for users to do some
>>>> customized operations for backpressure, but users can collect the metrics
>>>> of the job and analysis, for example, the metrics in [1] and [3]. I hope
>>>> this can help you.
>>>>
>>>> [1]
>>>> https://flink.apache.org/2021/07/07/how-to-identify-the-source-of-backpressure/#:~:text=Backpressure%20is%20an%20indicator%20that,the%20queues%20before%20being%20processed
>>>> .
>>>> [2]
>>>> https://www.alibabacloud.com/blog/analysis-of-network-flow-control-and-back-pressure-flink-advanced-tutorials_596632
>>>> [3]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/back_pressure/
>>>>
>>>> On Tue, May 23, 2023 at 9:40 PM Kamal Mittal 
>>>> wrote:
>>>>
>>>>> Hello Community,
>>>>>
>>>>> Can you please share views about the query asked above w.r.t back
>>>>> pressure for  FileSource APIs for Bulk and Record stream formats.
>>>>> Planning to use these APIs w.r.t AVRO to Parquet and vice-versa
>>>>> conversion.
>>>>>
>>>>> Rgds,
>>>>> Kamal
>>>>>
>>>>> On Tue, 23 May 2023, 12:26 pm Kamal Mittal, 
>>>>> wrote:
>>>>>
>>>>>> Added Flink community DL as well.
>>>>>>
>>>>>> -- Forwarded message -
>>>>>> From: Kamal Mittal 
>>>>>> Date: Tue, May 23, 2023 at 7:57 AM
>>>>>> Subject: Re: Backpressure handling in FileSource APIs - Flink 1.16
>>>>>> To: Shammon FY 
>>>>>>
>>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Yes, want to take some custom actions and also if there is any
>>>>>> default behavior of slowing down sending data in pipeline further or
>>>>>> reading data from source somehow?
>>>>>>
>>>>>> Rgds,
>>>>>> Kamal
>>>>>>
>>>>>> On Tue, May 23, 2023 at 6:06 AM Shammon FY  wrote:
>>>>>>
>>>>>>> Hi Kamal,
>>>>>>>
>>>>>>> If I understand correctly, do you want the source to do some custom
>>>>>>> actions, such as current limiting, when there is backpressure in the 
>>>>>>> job?
>>>>>>>
>>>>>>> Best,
>>>>>>> Shammon FY
>>>>>>>
>>>>>>>
>>>>>>> On Mon, May 22, 2023 at 2:12 PM Kamal Mittal 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello Community,
>>>>>>>>
>>>>>>>> Can you please share views about the query asked above w.r.t back
>>>>>>>> pressure for  FileSource APIs for Bulk and Record stream formats.
>>>>>>>> Planning to use these APIs w.r.t AVRO to Parquet and vice-versa
>>>>>>>> conversion.
>>>>>>>>
>>>>>>>> Rgds,
>>>>>>>> Kamal
>>>>>>>>
>>>>>>>> On Thu, May 18, 2023 at 2:33 PM Kamal Mittal 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello Community,
>>>>>>>>>
>>>>>>>>> Does FileSource APIs for Bulk and Record stream formats handle
>>>>>>>>> back pressure by any way like slowing down sending data in piepline 
>>>>>>>>> further
>>>>>>>>> or reading data from source somehow?
>>>>>>>>> Or does it give any callback/handle so that any action can be
>>>>>>>>> taken? Can you please share details if any?
>>>>>>>>>
>>>>>>>>> Rgds,
>>>>>>>>> Kamal
>>>>>>>>>
>>>>>>>>


Re: FlinkSQL大窗口小步长的滑动窗口解决方案

2023-05-28 Thread Shammon FY
Hi,

这是窗口触发后发送的数据量过大吗?调大资源,加大窗口计算的并发度是否可以缓解这个问题?

Best,
Shammon FY

On Fri, May 26, 2023 at 2:03 PM tanjialiang  wrote:

> Hi, all.
> 我在使用FlinkSQL的window tvf滑动窗口时遇到一些问题。
> 滑动步长为5分钟,窗口为24小时,group by
> user_id的滑动窗口,当任务挂掉了或者从kafka的earliest-offset消费,checkpoint很难成功。
> 因为从earliest开始消费,数据很快就会堆满缓冲区产生背压,这时这一批数据可能会触发N次窗口计算往下游发,每次触发的操作成本是(用户基数 *
> 24 * 60 / 5),checkpoint barrier可能会一直卡住。
> 这时候有什么办法可以破局吗?
>
>
> best,
> tanjialiang.


Re: 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?

2023-05-26 Thread Shammon FY
Hi

可以将天级时间和其他需要聚合的字段组成key,使用聚合算子,默认会每条数据完成计算后实时输出结果

Best,
Shammon FY

On Fri, May 26, 2023 at 3:44 PM casel.chen  wrote:

> 用flink sql如何实现累计当天交易数量每来一笔交易都实时更新状态并输出到下游?


Re: Why I can't run more than 19 tasks?

2023-05-24 Thread Shammon FY
Hi Hemi,

There may be two reasons that I can think of
1. The number of connections exceeds the MySQL limit, you can check the
options in my.cnf for your mysql server and increase the max connections.
2. Connection timeout for mysql client, you can try to add
'autoReconnect=true' to the connection url

Best,
Shammon FY


On Thu, May 25, 2023 at 8:32 AM Hemi Grs  wrote:

> hey everybody,
>
> I have a problem with my apache flink, I am synchronizing from MySQL to
> Elasticsearch but it seems that I can't run more than 19 tasks. it gave me
> this error:
>
> --
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.util.FlinkRuntimeException:
> java.sql.SQLTransientConnectionException: connection-pool-10.10.10.111:3306
> - Connection is not available, request timed out after 3ms. at
> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:64)
> at
> com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.discoveryCaptureTables(MySqlSnapshotSplitAssigner.java:171)
> ... 12 more
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> java.sql.SQLTransientConnectionException: connection-pool-10.10.10.111:3306
> - Connection is not available, request timed out after 3ms. at
> com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:72)
> at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890) at
> io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885)
> at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:418) at
> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:61)
> ... 13 moreCaused by: java.sql.SQLTransientConnectionException:
> connection-pool-10.10.10.111:3306 - Connection is not available, request
> timed out after 3ms.
> at
> com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
> at
> com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
> at
> com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
> at
> com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100)
> at
> com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:59)
> ... 17 more
> -
>
> I have try adding this 2 lines on flink-conf.yaml but doesn't do anything:
> -
> env.java.opts:
> "-Dcom.ververica.cdc.connectors.mysql.hikari.maximumPoolSize=100"
> flink.connector.mysql-cdc.max-pool-size: 100
> -
>
> does anybody know the solution?
> Additional info, my database is doing fine, because I try creating another
> apache flink server and it can run another 19 tasks, so total there 38
> tasks running and it's doing fine. So how do I run many tasks on 1 server
> and the server still have lots of resources.
>
> Thanks
>


Re: Backpressure handling in FileSource APIs - Flink 1.16

2023-05-23 Thread Shammon FY
Hi Kamal,

The source will slow down when there is backpressure in the flink job, you
can refer to docs [1] and [2] to get more detailed information about
backpressure mechanism.

Currently there's no API or Callback in source for users to do some
customized operations for backpressure, but users can collect the metrics
of the job and analysis, for example, the metrics in [1] and [3]. I hope
this can help you.

[1]
https://flink.apache.org/2021/07/07/how-to-identify-the-source-of-backpressure/#:~:text=Backpressure%20is%20an%20indicator%20that,the%20queues%20before%20being%20processed
.
[2]
https://www.alibabacloud.com/blog/analysis-of-network-flow-control-and-back-pressure-flink-advanced-tutorials_596632
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/back_pressure/

On Tue, May 23, 2023 at 9:40 PM Kamal Mittal  wrote:

> Hello Community,
>
> Can you please share views about the query asked above w.r.t back pressure
> for  FileSource APIs for Bulk and Record stream formats.
> Planning to use these APIs w.r.t AVRO to Parquet and vice-versa conversion.
>
> Rgds,
> Kamal
>
> On Tue, 23 May 2023, 12:26 pm Kamal Mittal,  wrote:
>
>> Added Flink community DL as well.
>>
>> -- Forwarded message -
>> From: Kamal Mittal 
>> Date: Tue, May 23, 2023 at 7:57 AM
>> Subject: Re: Backpressure handling in FileSource APIs - Flink 1.16
>> To: Shammon FY 
>>
>>
>> Hello,
>>
>> Yes, want to take some custom actions and also if there is any default
>> behavior of slowing down sending data in pipeline further or reading data
>> from source somehow?
>>
>> Rgds,
>> Kamal
>>
>> On Tue, May 23, 2023 at 6:06 AM Shammon FY  wrote:
>>
>>> Hi Kamal,
>>>
>>> If I understand correctly, do you want the source to do some custom
>>> actions, such as current limiting, when there is backpressure in the job?
>>>
>>> Best,
>>> Shammon FY
>>>
>>>
>>> On Mon, May 22, 2023 at 2:12 PM Kamal Mittal  wrote:
>>>
>>>> Hello Community,
>>>>
>>>> Can you please share views about the query asked above w.r.t back
>>>> pressure for  FileSource APIs for Bulk and Record stream formats.
>>>> Planning to use these APIs w.r.t AVRO to Parquet and vice-versa
>>>> conversion.
>>>>
>>>> Rgds,
>>>> Kamal
>>>>
>>>> On Thu, May 18, 2023 at 2:33 PM Kamal Mittal 
>>>> wrote:
>>>>
>>>>> Hello Community,
>>>>>
>>>>> Does FileSource APIs for Bulk and Record stream formats handle back
>>>>> pressure by any way like slowing down sending data in piepline further or
>>>>> reading data from source somehow?
>>>>> Or does it give any callback/handle so that any action can be taken?
>>>>> Can you please share details if any?
>>>>>
>>>>> Rgds,
>>>>> Kamal
>>>>>
>>>>


Re: Backpressure handling in FileSource APIs - Flink 1.16

2023-05-22 Thread Shammon FY
Hi Kamal,

If I understand correctly, do you want the source to do some custom
actions, such as current limiting, when there is backpressure in the job?

Best,
Shammon FY


On Mon, May 22, 2023 at 2:12 PM Kamal Mittal  wrote:

> Hello Community,
>
> Can you please share views about the query asked above w.r.t back pressure
> for  FileSource APIs for Bulk and Record stream formats.
> Planning to use these APIs w.r.t AVRO to Parquet and vice-versa conversion.
>
> Rgds,
> Kamal
>
> On Thu, May 18, 2023 at 2:33 PM Kamal Mittal  wrote:
>
>> Hello Community,
>>
>> Does FileSource APIs for Bulk and Record stream formats handle back
>> pressure by any way like slowing down sending data in piepline further or
>> reading data from source somehow?
>> Or does it give any callback/handle so that any action can be taken? Can
>> you please share details if any?
>>
>> Rgds,
>> Kamal
>>
>


Re: Question about Flink exception handling

2023-05-22 Thread Shammon FY
Hi Sharif,

I would like to know what do you want to do with the exception after
catching it? There are different ways for different requirements, for
example, Flink has already reported these exceptions.

Best,
Shammon FY


On Mon, May 22, 2023 at 4:45 PM Sharif Khan via user 
wrote:

> Hi, community.
> Can anyone please let me know?
>
> 1. What is the best practice in terms of handling exceptions in Flink jobs?
>
> 2. Is there any way to catch exceptions globally in Flink jobs? Basically,
> I want to catch exceptions from any operators in one place (globally).
>
> my expectation is let's say I have a pipeline
> source-> operator(A) -> operator(B) -> operator(C) -> sink.
> I don't want to write a try-catch for every operator. Is it possible to
> write one try-catch for the whole pipeline?
>
> I'm using the Python version of the Flink API. version 1.16
>
> Thanks in advance.
>
> [image: SELISE]
>
> SELISE Group
> Zürich: The Circle 37, 8058 Zürich-Airport, Switzerland
> Munich: Tal 44, 80331 München, Germany
> Dubai: Building 3, 3rd Floor, Dubai Design District, Dubai, United Arab
> Emirates
> Dhaka: Midas Center, Road 16, Dhanmondi, Dhaka 1209, Bangladesh
> Thimphu: Bhutan Innovation Tech Center, Babesa, P.O. Box 633, Thimphu,
> Bhutan
>
> Visit us: www.selisegroup.com
>
> *Important Note: This e-mail and any attachment are confidential and may
> contain trade secrets and may well also be legally privileged or otherwise
> protected from disclosure. If you have received it in error, you are on
> notice of its status. Please notify us immediately by reply e-mail and then
> delete this e-mail and any attachment from your system. If you are not the
> intended recipient please understand that you must not copy this e-mail or
> any attachment or disclose the contents to any other person. Thank you for
> your cooperation.*
>


Re: Getting exception when writing to parquet file with generic types disabled

2023-05-18 Thread Shammon FY
Hi Aniket,

Currently the filesystem connector does not support option
'pipeline.generic-types'='false', because the connector will output
`PartitionCommitInfo` messages for the downstream partition committer
operator even when there are no partitions in the sink table. There is a
`List partitions` field in `PartitionCommitInfo` which will cause
the exception you mentioned in the thread. I have created an issue [1] for
this.

[1] https://issues.apache.org/jira/browse/FLINK-32129

Best,
Shammon FY


On Thu, May 18, 2023 at 9:20 PM Aniket Sule 
wrote:

> Hi,
>
> I am trying to write data to parquet files using SQL insert statements.
> Generic types are disabled in the execution environment.
>
> There are other queries running in the same job that are
> counting/aggregating data. Generic types are disabled as a performance
> optimization for those queries.
>
>
>
> In this scenario, whenever I try to insert data into parquet files, I get
> an exception -
>
> Caused by: java.lang.UnsupportedOperationException: Generic types have
> been disabled in the ExecutionConfig and type java.util.List is treated as
> a generic type.
>
>
>
> I get the above exception even when I test with a simple table that has no
> array or list data types.
>
>
>
> Is there any way to write parquet files with generic types disabled?
>
>
>
> Thanks and regards,
>
> Aniket Sule.
>
>
>
>
>
> Here is a way to reproduce what I am seeing.
>
> My actual source is Kafka with data that is in json format.
>
> Datagen is simply to quickly reproduce the scenario.
>
> The environment is Flink 1.17.0.
>
> I am using the SQL cli.
>
>
>
> set 'sql-client.verbose'='true';
>
> set 'table.exec.source.idle-timeout'='1000';
>
> set 'table.optimizer.join-reorder-enabled'='true';
>
> set 'table.exec.mini-batch.enabled'='true';
>
> set 'table.exec.mini-batch.allow-latency'='5 s';
>
> set 'table.exec.mini-batch.size'='5000';
>
> set 'table.optimizer.agg-phase-strategy'='TWO_PHASE';
>
> set 'table.optimizer.distinct-agg.split.enabled'='true';
>
> set 'table.exec.state.ttl'='360 s';
>
> set 'pipeline.object-reuse'='true';
>
> set 'pipeline.generic-types'='false';
>
> set 'table.exec.deduplicate.mini-batch.compact-changes-enabled'='true';
>
>
>
> CREATE TABLE source_t (
>
>   order_number BIGINT,
>
>   order_name string,
>
>   risk float,
>
>   order_time   TIMESTAMP(3)
>
>   ) WITH (
>
> 'connector' = 'datagen'
>
>   );
>
>
>
> CREATE TABLE file_t (
>
>   order_number BIGINT,
>
>   order_name string,
>
>   risk float,
>
>   `year` string,`month` string,`day` string,`hour` string
>
>   ) WITH (
>
> 'connector'='filesystem',
>
> 'path' = '/tmp/data',
>
> 'format'='parquet'
>
>   );
>
>
>
> insert into file_t
>
> select order_number,order_name,risk ,
>
> date_format(order_time,'') as `year`, date_format(order_time,'MM') as
> `month`,date_format(order_time,'dd')as `day`,date_format(order_time,'HH')
> as `hour`
>
> from source_t;
>
>
>
> Resulting exception:
>
> [ERROR] Could not execute SQL statement. Reason:
>
> org.apache.flink.runtime.rest.util.RestClientException: [Internal server
> error., 
> org.apache.flink.table.gateway.api.utils.SqlGatewayException:
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> fetchResults.
>
> at
> org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85)
>
> at
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
>
> at
> org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
>
> at
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
>
> at
> java.base/java.util.Optional.ifPresent(Optional.java:183)
>
> at
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
>
> at
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetriev

Re: 使用flink sql创建版本视图无法正常使用

2023-05-17 Thread Shammon FY
Hi,

你邮件里的图片无法显示,也没办法看到具体的错误信息

Best,
Shammon FY


On Thu, May 18, 2023 at 10:15 AM arkey w  wrote:

> flink版本:1.14.5
> 在项目使用版本表时,准备使用版本视图,但创建后无法正常使用。后根据官网提供的示例(  Versioned Tables | Apache Flink
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/versioned_tables/>
> )进行验证也同样无法使用,创建sql如下:
> 创建事实表:
> [image: image.png]
>
> 创建版本视图:
> [image: image.png]
> [image: image.png]
>
>
> Temporal Join的结果出现了报错:
> [image: image.png]
>
> 在desc视图的时候发现视图并没有主键以及事件时间字段,而join的时候也因此报了错。
> 是我操作哪里有问题吗,要如何才能正确使用版本视图?
>
>


Re: Query on RestartPipelinedRegionFailoverStrategy

2023-05-16 Thread Shammon FY
Hi Prabhu,

Whether the tasks are in the same region depends on the DistributionPattern
between upstream and downstream Operators. For example, if the
DistributionPattern from A to B is ALL_TO_ALL, all subtasks for A and B
will be in the same range. Otherwise, if the DistributionPattern is
POINTWISE, JobManager will create independent relations between subtasks
and put the related subtasks to one region.

Best,
Shammon FY

On Tue, May 16, 2023 at 6:42 PM Prabhu Joseph 
wrote:

> Yes i expected the same. But all the tasks goes into one region and
> RestartPipelinedRegionFailoverStrategy restarts all of them. I see this
> strategy does not make any difference from RestartAllFailoverStrategy in
> stream execution mode. It could only help in Batch execution mode where
> Blocking result type is used which forms multiple different regions.
>
> I will come up/modify the strategy which returns two different regions and
> validate if it works without any impact.
>
>
> On Tue, 16 May, 2023, 8:09 am weijie guo, 
> wrote:
>
>> Hi Prabhu,
>>
>> If the edge between a -> b -> c -> d -> e all are point-wise, In theory,
>> it should form two regions.
>>
>> Best regards,
>>
>> Weijie
>>
>>
>> Prabhu Joseph  于2023年5月15日周一 09:58写道:
>>
>>> Hi, I am testing the Flink Fine-Grained Recovery
>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures>
>>> from Task Failures on Flink 1.17 and am facing some issues where I need
>>> some advice. Have a jobgraph below with 5 operators, and all connections
>>> between operators are pipelined and the job's parallelism.default is set to
>>> 2. Have configured RestartPipelinedRegionFailoverStrategy with Exponential
>>> Delay Restart Strategy.
>>>
>>> A -> B -> C -> D -> E
>>>
>>> There are a total of 10 tasks running. The first pipeline  (a1 to e1)
>>> runs on a TaskManager (say TM1), and the second pipeline (a2 to e2) runs on
>>> another TaskManager (say TM2).
>>>
>>> a1 -> b1 -> c1 -> d1 -> e1
>>> a2 -> b2 -> c2 -> d2 -> e2
>>>
>>> When TM1 failed, I expected only 5 tasks (a1 to e1) would fail and they
>>> alone would be restarted, but all 10 tasks are getting restarted. There is
>>> only one pipeline region, which consists of all 10 execution vertices, and
>>> RestartPipelinedRegionFailoverStrategy#getTasksNeedingRestart returns all
>>> 10 tasks. Is it the right behaviour, or could there be any issue? Is it
>>> possible to restart only the pipeline of the failed task (a1 to e1) without
>>> restarting other parallel pipelines.
>>>
>>> Thanks,
>>> Prabhu Joseph
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>


Re: 回复:报错显示为bug

2023-05-15 Thread Shammon FY
Hi,

从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段

Best,
Shammon FY

On Mon, May 15, 2023 at 7:29 PM lxk  wrote:

> 你好,从报错来看是类型不兼容导致的。
> Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column
> 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime"
> 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> At 2023-05-15 18:29:15, "小昌同学"  wrote:
> >|
> >package job;
> >import bean.BaseInfo;
> >import bean.MidInfo;
> >import bean.OutInfo;
> >import bean.ResultInfo;
> >import com.alibaba.fastjson.JSON;
> >import com.alibaba.fastjson.JSONObject;
> >import config.FlinkConfig;
> >import function.MyProcessFunction;
> >import org.apache.flink.api.common.functions.MapFunction;
> >import org.apache.flink.api.common.serialization.SimpleStringSchema;
> >import org.apache.flink.api.java.tuple.Tuple2;
> >import org.apache.flink.streaming.api.TimeCharacteristic;
> >import org.apache.flink.streaming.api.datastream.DataStream;
> >import org.apache.flink.streaming.api.datastream.DataStreamSource;
> >import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> >import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
> >import org.apache.flink.table.api.DataTypes;
> >import org.apache.flink.table.api.Schema;
> >import org.apache.flink.table.api.Table;
> >import org.apache.flink.table.api.TableSchema;
> >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> >import org.apache.flink.table.types.DataType;
> >import org.apache.flink.util.OutputTag;
> >import sink.Sink2Mysql;
> >import utils.DateUtil;
> >import utils.DateUtils;
> >import utils.JdbcUtil;
> >
> >import java.sql.Connection;
> >import java.sql.PreparedStatement;
> >import java.sql.ResultSet;
> >import java.time.*;
> >import java.util.Date;
> >import java.util.HashMap;
> >import java.util.Properties;
> >
> >public class RytLogAnly4 {
> >public static void main(String[] args) throws Exception {
> >StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >
> >//使用侧输出流
> >OutputTag requestStream = new
> OutputTag("requestStream") {
> >};
> >OutputTag answerStream = new
> OutputTag("answerStream") {
> >};
> >
> >//1、连接测试环境kafka的数据
> >String servers =
> FlinkConfig.config.getProperty("dev_bootstrap.servers");
> >String topicName =
> FlinkConfig.config.getProperty("dev_topicName");
> >String groupId = FlinkConfig.config.getProperty("dev_groupId");
> >String devMode = FlinkConfig.config.getProperty("dev_mode");
> >Properties prop = new Properties();
> >prop.setProperty("bootstrap.servers", servers);
> >prop.setProperty("group.id", groupId);
> >prop.setProperty("auto.offset.reset", devMode);
> >DataStreamSource sourceStream = env.addSource(new
> FlinkKafkaConsumer(topicName, new SimpleStringSchema(), prop));
> >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 --
> <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBwACABYAAACuAgCuAgAATQFIAAFSMDEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"}
> >
> >//2、对源数据进行处理,生成baseInfo基类的数据
> >SingleOutputStreamOperator baseInfoStream =
> sourceStream.map(new MapFunction() {
> >@Override
> >public BaseInfo map(String value) throws Exception {
> >JSONObject jsonObject = JSON.parseObject(value);
> >//获取到不同的服务器IP
> >String serverIp = jsonObject.getString("ip");
> >//获取到不同的data的

Re: StreamTable Environment initialized failed -- "Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath"

2023-05-15 Thread Shammon FY
Hi krislee,

I think you can try to add the path of jars in FLINK_HOME to the classpath
of your command, such as

1. mvn exec:java -Dexec.mainClass="xxx" -Dexec.classpathScope="your
classpath"
2. java -jar target/xxx.jar -cp {your classpath}

Or you can run your application in IDE directly after the planner
dependency is added.

Best,
Shammon FY

On Mon, May 15, 2023 at 10:18 PM Lucifer_jl  wrote:

> hi:
> I think there may be the following reasons:
> 1、the scope of dependency  is 'provided' in your  "pom.xml" 。When you
> package, this dependency will not be included in the jar. and you xxx.jar
> execution enviroment lack this dependency
> 2、i can not see the "pom.xml" completely,can you add the dependency
> 
> org.apache.flink
> flink-table-api-scala-bridge_2.11
> 1.9.0
> 
> good luck
>


Re: JVM Metaspace for Task Mangers and Job Managers are not getting released.

2023-05-14 Thread Shammon FY
Hi Ajinkya,

The command 'jcmd  GC.run' can be used to trigger fullgc for JVM
process. However, it should be noted that this may have a performance
impact on the ongoing computation.

Best,
Shammon FY


On Mon, May 15, 2023 at 10:51 AM Ajinkya Pathrudkar <
ajinkya.pathrudka...@gmail.com> wrote:

> Hi Shammon,
>
> When you say FullGC means are you referring to call system.gc()? And one
> more thing I am struggling to find is how I can call fullGC for JobManger
> and TaskManager?
>
> Thanks,
> Ajinkya
>
> On Sun, May 14, 2023 at 10:40 PM Ajinkya Pathrudkar <
> ajinkya.pathrudka...@gmail.com> wrote:
>
>> Ok, I will check.
>>
>> On Sun, May 14, 2023 at 9:39 PM Shammon FY  wrote:
>>
>>> Hi Ajinkya,
>>>
>>> The memory of metaspace may need to be released through FullGC, you can
>>> try to trigger fullgc manually in JobManager and TaskManager, and check
>>> whether the metaspace is released.
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Sat, May 13, 2023 at 4:01 PM Jiadong lu  wrote:
>>>
>>>> Hi, Ajinkya
>>>>
>>>> Maybe some threads in your job were not shut down when the job was
>>>> closed?
>>>>
>>>> Best,
>>>> Jiadong Lu
>>>>
>>>> On 2023/5/13 4:58, Ajinkya Pathrudkar wrote:
>>>> > Hello,
>>>> >
>>>> > I am observing JVM Metaspace memory for Task Managers and Job Manager
>>>> is
>>>> > not getting released. Any thoughts?
>>>> >
>>>> > image.png
>>>> >
>>>> >
>>>> > Thanks,
>>>> > Ajinkya
>>>>
>>> --
>> Thanks & Regards,
>> Ajinkya Pathrudkar
>>
> --
> Thanks & Regards,
> Ajinkya Pathrudkar
>


Re: JVM Metaspace for Task Mangers and Job Managers are not getting released.

2023-05-14 Thread Shammon FY
Hi Ajinkya,

The memory of metaspace may need to be released through FullGC, you can try
to trigger fullgc manually in JobManager and TaskManager, and check whether
the metaspace is released.

Best,
Shammon FY

On Sat, May 13, 2023 at 4:01 PM Jiadong lu  wrote:

> Hi, Ajinkya
>
> Maybe some threads in your job were not shut down when the job was closed?
>
> Best,
> Jiadong Lu
>
> On 2023/5/13 4:58, Ajinkya Pathrudkar wrote:
> > Hello,
> >
> > I am observing JVM Metaspace memory for Task Managers and Job Manager is
> > not getting released. Any thoughts?
> >
> > image.png
> >
> >
> > Thanks,
> > Ajinkya
>


Re: flink 状态设置

2023-05-14 Thread Shammon FY
Hi,

"如果不对于状态进行管理,后续程序会出现问题"是指状态会变得太大?如果是这样,可以在group
by的字段里增加一个天级的时间戳,这样就不会由于key被更新导致的状态过期失效问题

Best,
Shammon FY


On Fri, May 12, 2023 at 1:59 PM 小昌同学  wrote:

> 各位老师好,我这边使用的flink sql是"
> select funcId,funcIdDesc,serverIp,cast(min(maxTime-minTime) as
> varchar(200)) as minTime,pk from
> (
>  select
>   a.funcId as funcId ,
>   a.funcIdDesc as funcIdDesc,
>   a.serverIp as serverIp,
>   b.outTime as maxTime,
>   a.outTime as minTime,
>   concat(a.funcId,a.serverIp) as pk
>  from tableRequest a
>  inner join tableAnswer b
>  on a.handleSerialNo=b.handleSerialNo
> )
> group by funcId,funcIdDesc,serverIp,pk‍‍‍"
>
> 考虑如果不对于状态进行管理,后续程序会出现问题,我这边想实现的状态管理是:我上述的这个sql计算的数据仅仅只是当天(24小时)的,等到第二天就把之前的全部状态全部清除掉,基于这样的场景我可以怎么设置什么参数管理状态,我自己设置参数为“tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L));”,看官网的解释,感觉这样会有问题,idlestate是只要更新了就会重新设置过期时间,但是我想实现效果是不管是有咩有更新,只要不是属于今天的就全部清理掉。
>
>
> | |
> 小昌同学
> |
> |
> ccc0606fight...@163.com
> |


Re: How to know when a pipeline ends

2023-05-11 Thread Shammon FY
Hi Luke,

Maybe you can get 'JobClient' after submit the job and check the job status
with 'JobClient.getJobStatus()'

Best,
Shammon FY


On Fri, May 12, 2023 at 2:58 AM Luke Xiong  wrote:

> Hi,
>
> My flink job needs to do something when the pipeline execution has ended.
> The job code is like this:
>
> createSomeStream().applySomeOperators();
> env.execute(jobName);
> doSomeCleanupTasks();
>
> It looks like doSomeCleanupTasks() can be called while the pipeline is
> still running. The job is for processing a bounded stream, so it doesn't
> run forever. Is it possible to achieve this so doSomeCleanupTasks is called
> only when the pipeline has processed all the data? This happens when the
> runtime mode is STREAMING. Would running it in BATCH mode make any
> difference?
>
> Regards,
> Luke
>
>
>


Re: flink 1.13 partition.time-extractor.timestamp-pattern 格式

2023-05-10 Thread Shammon FY
Hi,

就像上面文档描述的,如果是多个字段组合成partition,可以在DDL中通过partition.time-
extractor.timestamp-pattern将多个字段按照自己的partition格式需求进行组装。
CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  `hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
  'connector'='filesystem',
  'path'='...',
  'format'='parquet',
  'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00'
);

如果只是一个timestamp字段,想要转换成其他的时间格式,可以参考文档[1]里的例子,新建一个自己的
PartitionTimeExtractor然后通过partition.time-extractor.class指定

在flink-1.15版本及以后[2],已经支持了partition.time-extractor.timestamp-formatter,对timestamp-pattern组装的partition时间戳进行格式转换

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-time-extractor
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/filesystem/#%e5%88%86%e5%8c%ba%e6%97%b6%e9%97%b4%e6%8f%90%e5%8f%96%e5%99%a8

Best,
Shammon FY

On Wed, May 10, 2023 at 5:42 PM 莫失莫忘  wrote:

>
> 我hive的分区格式是 dt='20200520',格式是 flinkSQL 实时任务写hive 只支持 '-mm-dd
> hh:mm:ss' 格式,请问怎么指定  partition.time-extractor.timestamp-pattern 的格式为 'mmdd
> hh:mm:ss' 。flink版本是1.13
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/filesystem/#partition-commit
>
>
>
>
>
>
> --
>
>
>


Re: flink1.16.1 jdbc-3.1.0-1.16 There is a problem trying left join

2023-05-10 Thread Shammon FY
Hi Yangxueyong,

Are you sure this is your Flink SQL job? This SQL statement looks very
strange, the table 'test_flink_res2' is both source and sink, and the join
key is null.

Best,
Shammon FY

On Wed, May 10, 2023 at 12:54 PM yangxueyong 
wrote:

> flink1.16.1
>
> mysql8.0.33
>
> jdbc-3.1.0-1.16
>
>
> I have a sql,
>
> insert into test_flink_res2(id,name,address)
> select a.id,a.name,a.address from test_flink_res1 a left join
> test_flink_res2 b on a.id=b.id where a.name='abc0.11317691217472489' and
> b.id is null;
>
> *Why does flinksql convert this statement into the following statement?*
>
> SELECT `address` FROM `test_flink_res1` WHERE ((`name` =
> 'abc0.11317691217472489')) AND ((`id` IS NULL))
>
> *As a result, there is no data in test_flink_res2,why?*
>
>
>
>


Re: Flink Kafka Source rebalancing - 1.14.2

2023-05-09 Thread Shammon FY
Hi Madan,

Could you give the old and new versions of flink and provide the job plan?
I think it will help community to find the root cause

Best,
Shammon FY

On Wed, May 10, 2023 at 2:04 AM Madan D via user 
wrote:

> Hello Team,
>
> We have been using Flink Kafka consumer and recently we have been moving
> to Flink Kafka source to get more advanced features but we have been
> observing more rebalances right after data consumed and moving to next
> operator than Flink Kafka consumer.
>
> Can you please let us know what might be causing even though we are using
> same parallelism in old and new jobs and we are on Flink 1.14.2.
>
>
> Regards,
> Madan
>


Re: 使用Flink SQL如何实现支付对帐超时告警?

2023-05-09 Thread Shammon FY
Hi

如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种
DataStream s1 = ...;
DataStream s2 = ...;
DataStream s = s1.union(s1)...;
Pattern = Pattern.begin("first")
.subtype(E1.class)
.where(...)
.followedBy("second")
.subtype(E2.class)
.where(...)

如果使用Flink SQL,可以直接使用双流Join+窗口实现

Best,
Shammon FY




On Wed, May 10, 2023 at 2:24 AM casel.chen  wrote:

> 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink
> SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。
> 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现?
>
> 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。


Re: akka.remote.OversizedPayloadException after we upgrade to Flink 1.15

2023-05-08 Thread Shammon FY
Hi Wei,

>From the error message, I guess the reason for the issue is that the events
sent by SplitEnumerator to the source exceeds the default size of akka. You
can add the option 'akka.framesize' to set the akka packet size, or try to
decrease the event size.

When you use 'FlinkKafkaConsumer' to read data from kafka, the source
subtask in TaskManager will connect to kafka and read data directly, but
'KafkaSource' doesn't act as that. You can refer to FLIP-27 [1] to get more
detailed information about 'KafkaSource'. Simply speaking, the
'SplitEnumerator' in JobManager will get splits from kafka and send them to
source subtask to read data.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

Best,
Shammon FY


On Tue, May 9, 2023 at 2:37 AM Wei Hou via user 
wrote:

> Hi Team,
>
> We hit an issue after we upgrade our job from Flink 1.12 to 1.15,  there's
> a consistent akka.remote.OversizedPayloadException after job restarts:
>
> Transient association error (association remains live)
> akka.remote.OversizedPayloadException: Discarding oversized payload sent to
> Actor[akka.tcp://flink@xxx/user/rpc/taskmanager_0#-311495648]: max
> allowed size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 33670549
> bytes.
>
>
> In the job, We changed the kafka consumer from FlinkKafkaConsumer to the
> new KafkaSource, and we noticed there's a stackoverflow (
> https://stackoverflow.com/questions/75363084/jobs-stuck-while-trying-to-restart-from-a-checkpoint
> )  talking about _metadata file size kept doubling after that change.
>
> We later checked the _metadata for our own job, it did increase a lot for
> each restart, (around 128 MB when we hit the akka error). I'd like to see
> if there's a known root cause for this problem and what can we do here to
> eliminate it?
>
>
> Best,
> Wei
>


Re: Flink SQL Async UDF

2023-05-07 Thread Shammon FY
Hi Giannis,

Have you use "CREATE FUNCTION asyncfn AS 'Your full class name of async
function class'" or "CREATE TEMPORARY FUNCTION asyncfn AS 'Your full class
name of async function class'" to create a customized function named
"asyncfn" before it is used in your sql?

The error message "No match found for function signature" usually indicates
that the function does not exist or the parameters do not match.

Best,
Shammon FY

On Sun, May 7, 2023 at 2:55 PM Giannis Polyzos 
wrote:

> I can't really find any examples / docs for Flink's AsyncTableFunction and
> I have a hard time getting it to work.
> Is there any example you can share that just takes as input a String key
> and outputs lets say a record (returned by the lookup?)
> Also is not clear to me how the async happens internally.
> Is the future in the eval method signature used?
>
> I tried implementing eval methods like:
> *public final void eval(CompletableFuture> future,
> Object... keys)*
>
> *or *
>
>
> *public void eval(CompletableFuture> result, String
> rowkey)*
>
> but in both cases if I do something like
> *SELECT asyncfn(accountId) from transactions;*
> I get
> *org.apache.calcite.sql.validate.SqlValidatorException: No match found for
> function signature asyncfn()*
>
> Not sure what I am missing
>
> Thanks,
> Giannis
>


Re: Can flink1.15.2 use flink sql to create a broadcast table? I didn't find anything related in https://flink.apache.org/

2023-05-07 Thread Shammon FY
Hi yxy,

As Hang mentioned, I think lookup join matches your requirements too. You
can refer to doc [1] to get more detailed information.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join

Best,
Shammon FY


On Sat, May 6, 2023 at 5:54 PM Hang Ruan  wrote:

> Hi, yxy,
>
> I think this scenario could be resolved by a lookup join or a UDF. We can
> store the ratio in the mysql table. Then we could read it by a lookup join
> or implement a UDF to read the ratio.
>
> Best,
> Hang
>
> yxy  于2023年5月6日周六 15:14写道:
>
>> Hello, we have a business scenario.  We have a real-time process to
>> calculate how much red envelopes should be given to them for each
>> transaction.  For example, if a customer pays $100, we will give him a
>> rebate of one thousandth.  We currently use flinksql to Realize this
>> function, but we found that flinksql cannot dynamically adjust this ratio.
>> We want to know can flinksql implement broadcast tables like this?
>> Broadcast the ratio?
>
>


Re: MSI Auth to Azure Storage Account with Flink Apache Operator not working

2023-05-05 Thread Shammon FY
Hi DEROCCO,

I think you can check the startup command of the job on k8s to see if the
jar file is in the classpath.

If your job is DataStream, you need to add hadoop azure dependency in your
project, and if it is an SQL job, you need to include this jar file in your
Flink release package. Or you can also add this package in your cluster
environment.

Best,
Shammon FY


On Fri, May 5, 2023 at 10:21 PM DEROCCO, CHRISTOPHER  wrote:

> How can I add the package to the flink job or check if it is there?
>
>
>
> *From:* Shammon FY 
> *Sent:* Thursday, May 4, 2023 9:59 PM
> *To:* DEROCCO, CHRISTOPHER 
> *Cc:* user@flink.apache.org
> *Subject:* Re: MSI Auth to Azure Storage Account with Flink Apache
> Operator not working
>
>
>
> Hi DEROCCO,
>
>
>
> I think you need to check whether there is a hadoop-azure jar file in the
> classpath of your flink job. From an error message '*Caused by:
> java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.*', your
> flink job may be missing this package.
>
>
>
> Best,
>
> Shammon FY
>
>
>
>
>
> On Fri, May 5, 2023 at 4:40 AM DEROCCO, CHRISTOPHER 
> wrote:
>
>
>
> I receive the error:  *Caused by: java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.*
>
> I’m using flink 1.16 running in Azure Kubernetes using the Flink Apache
> Kubernetes Operator.
>
> I have the following specified in the spec.flinkConfiguration: as per the
> Apache Kubernetes operator documentation.
>
>
>
> fs.azure.createRemoteFileSystemDuringInitialization: "true"
>
> fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net
> <https://urldefense.com/v3/__http:/fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4HpyjvOZFkA$>:
> OAuth
>
> fs.azure.account.oauth.provider.type..
> dfs.core.windows.net
> <https://urldefense.com/v3/__http:/dfs.core.windows.net__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4Hpycm9yrUw$>:
> org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider
>
> fs.azure.account.oauth2.msi.tenant. .
> dfs.core.windows.net
> <https://urldefense.com/v3/__http:/dfs.core.windows.net__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4Hpycm9yrUw$>:
> 
>
> fs.azure.account.oauth2.client.id
> <https://urldefense.com/v3/__http:/fs.azure.account.oauth2.client.id__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4HpwRB0LkWg$>.
> .dfs.core.windows.net
> <https://urldefense.com/v3/__http:/dfs.core.windows.net__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4Hpycm9yrUw$>:
> 
>
> fs.azure.account.oauth2.client.endpoint. .
> dfs.core.windows.net
> <https://urldefense.com/v3/__http:/dfs.core.windows.net__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4Hpycm9yrUw$>:
> https://login.microsoftonline.com/
> <https://urldefense.com/v3/__https:/login.microsoftonline.com/__;!!BhdT!nslIUVS9K-rzMRvjMFpWqBpcsAIiVPAfG6uroDOiSSQfmARHAQCYweWSe-TmKGHGzKD4HpzeWh7XLg$> TENANT ID>/oauth2/token
>
>
>
> I also have this specified in the container environment variables.
>
> - name: ENABLE_BUILT_IN_PLUGINS
>
>value: flink-azure-fs-hadoop-1.16.1.jar
>
>
>
> I think I’m missing a configuration step because the MsiTokenProvider
> class is not found based on the logs. Any help would be appreciated.
>
>
>
>
>
> *Chris deRocco*
>
> Senior – Cybersecurity
>
> Chief Security Office | STORM Threat Analytics
>
>
>
> *AT*
>
> Middletown, NJ
>
> Phone: 732-639-9342
>
> Email: cd9...@att.com
>
>
>
>


Re: Custom Operator Placement for Kubernetes

2023-05-05 Thread Shammon FY
Hi chris,

I think there is no existing method that allows you to customize placing
the operator on the specified node.

By the way, I think "predicting the optimal parallelism" of flink jobs is
interesting. Currently flink supports autoscaling mechanism, you can find
detailed information in this FLIP [1].

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling

Best,
Shammon FY


On Fri, May 5, 2023 at 11:48 PM John Gerassimou 
wrote:

> Sorry for the mix-up. I read your message wrong. Please ignore my last
> reply.
>
> On Fri, May 5, 2023 at 11:42 AM John Gerassimou <
> john.gerassi...@unity3d.com> wrote:
>
>> Hi Chris,
>>
>> You should be able to do this using nodeSelector, or taints and
>> tolerations.
>>
>>
>> https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/flink-operator.yaml#L55:L61
>>
>> Thanks
>> John
>>
>> On Fri, May 5, 2023 at 8:38 AM  wrote:
>>
>>> Hi,
>>>
>>> is there a way to manually define which node an operator should be
>>> placed in, using Kubernetes?
>>>
>>> To give a bit more context, for my master's thesis, I'm looking into
>>> predicting the optimal parallelism degree for a node. To do so, we use a
>>> Zero Shot Model, which predicts the latency and throughput for a given
>>> query.  To increase performance, we need to manually place operators on
>>> different nodes in the network and incorporate other learning methods to
>>> see the best configuration.
>>>
>>> Regards,
>>> Chris
>>>
>>


Re: 退订

2023-05-04 Thread Shammon FY
如果需要取消订阅 user@flink.apache.org 和  d...@flink.apache.org 邮件组,请发送任意内容的邮件到
user-unsubscr...@flink.apache.org 和  dev-unsubscr...@flink.apache.org ,参考[1]

[1] https://flink.apache.org/zh/community/

On Fri, May 5, 2023 at 10:43 AM 谢浩天  wrote:

> 您好:
>退订!
>
>
> 谢浩天
>
>


Re: MSI Auth to Azure Storage Account with Flink Apache Operator not working

2023-05-04 Thread Shammon FY
Hi DEROCCO,

I think you need to check whether there is a hadoop-azure jar file in the
classpath of your flink job. From an error message '*Caused by:
java.lang.ClassNotFoundException: Class
org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.*', your
flink job may be missing this package.

Best,
Shammon FY


On Fri, May 5, 2023 at 4:40 AM DEROCCO, CHRISTOPHER  wrote:

>
>
> I receive the error:  *Caused by: java.lang.ClassNotFoundException: Class
> org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider not found.*
>
> I’m using flink 1.16 running in Azure Kubernetes using the Flink Apache
> Kubernetes Operator.
>
> I have the following specified in the spec.flinkConfiguration: as per the
> Apache Kubernetes operator documentation.
>
>
>
> fs.azure.createRemoteFileSystemDuringInitialization: "true"
>
> fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net:
> OAuth
>
> fs.azure.account.oauth.provider.type..
> dfs.core.windows.net:
> org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider
>
> fs.azure.account.oauth2.msi.tenant. .
> dfs.core.windows.net: 
>
> fs.azure.account.oauth2.client.id. .
> dfs.core.windows.net: 
>
> fs.azure.account.oauth2.client.endpoint. .
> dfs.core.windows.net: https://login.microsoftonline.com/ ID>/oauth2/token
>
>
>
> I also have this specified in the container environment variables.
>
> - name: ENABLE_BUILT_IN_PLUGINS
>
>value: flink-azure-fs-hadoop-1.16.1.jar
>
>
>
> I think I’m missing a configuration step because the MsiTokenProvider
> class is not found based on the logs. Any help would be appreciated.
>
>
>
>
>
> *Chris deRocco*
>
> Senior – Cybersecurity
>
> Chief Security Office | STORM Threat Analytics
>
>
>
> *AT*
>
> Middletown, NJ
>
> Phone: 732-639-9342
>
> Email: cd9...@att.com
>
>
>


  1   2   3   >