Found an issue when using Flink 1.19's AsyncScalarFunction

2024-04-09 Thread Xiaolong Wang
Hi,

I found a ClassNotFound exception when using Flink 1.19's
AsyncScalarFunction.

Stack trace:

Caused by: java.lang.ClassNotFoundException:
> org.apache.commons.text.StringSubstitutor
>
> at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>
> at
> org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:150)
> ~[flink-dist-1.19.0.jar:1.19.0]
>
> at
> org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:113)
> ~[flink-dist-1.19.0.jar:1.19.0]
>
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>
> at
> org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateProcessCode(AsyncCodeGenerator.java:173)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateFunction(AsyncCodeGenerator.java:77)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.getAsyncFunctionOperator(CommonExecAsyncCalc.java:146)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.createAsyncOneInputTransformation(CommonExecAsyncCalc.java:126)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.translateToPlanInternal(CommonExecAsyncCalc.java:89)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
> ~[?:?]
>

 Environment:

  flink image: flink:1.19.0-scala_2.12-java11

Tried solutions:

I tried to package the needed dependency `commons-io-1.10.0.jar` into both
user jar and the classpath and the issue remained.

Would someone please help resolve this ?


[flink-k8s-connector] In-place scaling up often takes several times till it succeeds.

2023-12-06 Thread Xiaolong Wang
Hi,

I'm playing with a Flink 1.18 demo with the auto-scaler and the adaptive
scheduler.

The operator can correctly collect data and order the job to scale up, but
it'll take the job several times to reach the required parallelism.

E.g. The original parallelism for each vertex is something like below:

Vertex : write hourly_ads s3 || Parallelism : 100
Vertex : Source: ads-kafkasource -> Timestamps/Watermarks ->
ads-filter-action -> ads-cast || Parallelism : 100
Vertex : Commit hourly_ads || Parallelism : 1


Then the operator decides to scale the job to this:

Vertex : write hourly_ads s3 || Parallelism : 200
Vertex : Source: ads-kafkasource -> Timestamps/Watermarks ->
ads-filter-action -> ads-cast || Parallelism : 100
Vertex : Commit hourly_ads || Parallelism : 1


But when scaling, the vertex write hourly_ads s3 does scale directly from
100 to 200, but first scales to a number, say 120, then 150, then 180.
It'll take 3~4 times till the job to reach the required parallelism.

I'm wondering how to avoid this issue ?

Thanks in advanced.


Re: Flink Kubernetes operator keeps reporting REST client timeout.

2023-11-20 Thread Xiaolong Wang
Seems the operator didn't get restarted automatically after the configmap
is changed. After a roll-out restart, the exception disappeared. Never mind
this issue. Thanks.

On Tue, Nov 21, 2023 at 11:31 AM Xiaolong Wang 
wrote:

> Hi,
>
> Recently I upgraded the flink-kubernetes-operator from 1.4.0 to 1.6.1 to
> use Flink 1.18. After that, the operator kept reporting the following
> exception:
>
> 2023-11-21 03:26:50,505 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO
>> ][sn-push/sn-push-decision-maker-log-s3-hive-prd] Resource fully
>> reconciled, nothing to do...
>>
>> 2023-11-21 03:26:50,727 o.a.f.r.r.RestClient   [WARN
>> ][realtime-streaming/realtime-perf-report-main-prd-test] Rest endpoint
>> shutdown failed.
>>
>> java.util.concurrent.TimeoutException
>>
>> at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown
>> Source)
>>
>> at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>>
>> at org.apache.flink.runtime.rest.RestClient.shutdown(RestClient.java:227)
>>
>> at
>> org.apache.flink.client.program.rest.RestClusterClient.close(RestClusterClient.java:270)
>>
>> at
>> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getTaskManagersInfo(AbstractFlinkService.java:925)
>>
>> at
>> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getClusterInfo(AbstractFlinkService.java:621)
>>
>> at
>> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeClusterInfo(AbstractFlinkDeploymentObserver.java:85)
>>
>> at
>> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:75)
>>
>> at
>> org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:49)
>>
>> at
>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:129)
>>
>> at
>> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>>
>> at
>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>>
>> at
>> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>>
>> at
>> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>>
>> at
>> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>>
>> at
>> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>>
>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>> Source)
>>
>> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
>> Source)
>>
>> at java.base/java.lang.Thread.run(Unknown Source)
>>
>
> I tried to increase the rest timeout param of 
> "job.autoscaler.flink.rest-client.timeout"
> to 60 s, yet it does not resolve the issue.
>
> Could you help check this out ? Thanks in advance.
>


Flink Kubernetes operator keeps reporting REST client timeout.

2023-11-20 Thread Xiaolong Wang
Hi,

Recently I upgraded the flink-kubernetes-operator from 1.4.0 to 1.6.1 to
use Flink 1.18. After that, the operator kept reporting the following
exception:

2023-11-21 03:26:50,505 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO
> ][sn-push/sn-push-decision-maker-log-s3-hive-prd] Resource fully
> reconciled, nothing to do...
>
> 2023-11-21 03:26:50,727 o.a.f.r.r.RestClient   [WARN
> ][realtime-streaming/realtime-perf-report-main-prd-test] Rest endpoint
> shutdown failed.
>
> java.util.concurrent.TimeoutException
>
> at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown
> Source)
>
> at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
>
> at org.apache.flink.runtime.rest.RestClient.shutdown(RestClient.java:227)
>
> at
> org.apache.flink.client.program.rest.RestClusterClient.close(RestClusterClient.java:270)
>
> at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getTaskManagersInfo(AbstractFlinkService.java:925)
>
> at
> org.apache.flink.kubernetes.operator.service.AbstractFlinkService.getClusterInfo(AbstractFlinkService.java:621)
>
> at
> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeClusterInfo(AbstractFlinkDeploymentObserver.java:85)
>
> at
> org.apache.flink.kubernetes.operator.observer.deployment.AbstractFlinkDeploymentObserver.observeInternal(AbstractFlinkDeploymentObserver.java:75)
>
> at
> org.apache.flink.kubernetes.operator.observer.AbstractFlinkResourceObserver.observe(AbstractFlinkResourceObserver.java:49)
>
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:129)
>
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:56)
>
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:138)
>
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:96)
>
> at
> org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
>
> at
> io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:95)
>
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
>
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
>
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
>
> at
> io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
>
> at
> io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:414)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>
> at java.base/java.lang.Thread.run(Unknown Source)
>

I tried to increase the rest timeout param of
"job.autoscaler.flink.rest-client.timeout"
to 60 s, yet it does not resolve the issue.

Could you help check this out ? Thanks in advance.


Re: Flink K8S operator does not support IPv6

2023-09-05 Thread Xiaolong Wang
FYI, adding environment variables of `
KUBERNETES_DISABLE_HOSTNAME_VERIFICATION=true` works for me.

This env variable needs to be added to both the Flink operator and the
Flink job definition.

On Tue, Aug 8, 2023 at 12:03 PM Xiaolong Wang 
wrote:

> Ok, thank you.
>
> On Tue, Aug 8, 2023 at 11:22 AM Peter Huang 
> wrote:
>
>> We will handle it asap. Please check the status of this jira
>> https://issues.apache.org/jira/browse/FLINK-32777
>>
>> On Mon, Aug 7, 2023 at 8:08 PM Xiaolong Wang 
>> wrote:
>>
>>> Hi,
>>>
>>> I was testing flink-kubernetes-operator in an IPv6 cluster and found out
>>> the below issues:
>>>
>>> *Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
>>>> fd70:e66a:970d::1 not verified:*
>>>>
>>>> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>>>>
>>>> *DN: CN=kube-apiserver*
>>>>
>>>> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
>>>> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
>>>> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
>>>> <http://c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com>,
>>>> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
>>>> kubernetes.default, kubernetes.default.svc,
>>>> kubernetes.default.svc.cluster.local]*
>>>>
>>>
>>> Which seemed to be related to a known issue
>>> <https://github.com/square/okhttp/issues/7368> of okhttp.
>>>
>>> I'm wondering if there is a plan to support IPv6 for
>>> flink-kubernetes-operator in the near future ?
>>>
>>


Re: Flink K8S operator does not support IPv6

2023-08-07 Thread Xiaolong Wang
Ok, thank you.

On Tue, Aug 8, 2023 at 11:22 AM Peter Huang 
wrote:

> We will handle it asap. Please check the status of this jira
> https://issues.apache.org/jira/browse/FLINK-32777
>
> On Mon, Aug 7, 2023 at 8:08 PM Xiaolong Wang 
> wrote:
>
>> Hi,
>>
>> I was testing flink-kubernetes-operator in an IPv6 cluster and found out
>> the below issues:
>>
>> *Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
>>> fd70:e66a:970d::1 not verified:*
>>>
>>> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>>>
>>> *DN: CN=kube-apiserver*
>>>
>>> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
>>> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
>>> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
>>> <http://c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com>,
>>> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
>>> kubernetes.default, kubernetes.default.svc,
>>> kubernetes.default.svc.cluster.local]*
>>>
>>
>> Which seemed to be related to a known issue
>> <https://github.com/square/okhttp/issues/7368> of okhttp.
>>
>> I'm wondering if there is a plan to support IPv6 for
>> flink-kubernetes-operator in the near future ?
>>
>


Flink K8S operator does not support IPv6

2023-08-07 Thread Xiaolong Wang
Hi,

I was testing flink-kubernetes-operator in an IPv6 cluster and found out
the below issues:

*Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
> fd70:e66a:970d::1 not verified:*
>
> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>
> *DN: CN=kube-apiserver*
>
> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
> ,
> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
> kubernetes.default, kubernetes.default.svc,
> kubernetes.default.svc.cluster.local]*
>

Which seemed to be related to a known issue
 of okhttp.

I'm wondering if there is a plan to support IPv6 for
flink-kubernetes-operator in the near future ?


[Bug-report]Flink-operator 1.6.0 repo does not exist yet

2023-08-02 Thread Xiaolong Wang
Hi,

I noticed that the newest documentation of the flink-operator has pointed
to v1.6.0, yet when using the `helm repo add flink-operator-repo
https://downloads.apache.org/flink/flink-kubernetes-operator-1.6.0/`
command to install, it turns out that the given URL does not exist.

I suppose that 1.6.0 is not ready and the documentation(
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/)
should
point to 1.5.0 ?


Re: How to resume a job from checkpoint with the SQL gateway.

2023-07-18 Thread Xiaolong Wang
Thank you for the information.  That solution works for me !

On Tue, Jul 18, 2023 at 3:00 PM Shammon FY  wrote:

> Hi Xiaolong,
>
> For new versions such as flink-1.17, flink sql-gateway supports job
> management and user can stop/start jobs with savepoint. You can start a job
> with a given savepoint path as [1] and stop a job with or without savepoint
> as [2].
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#start-a-sql-job-from-a-savepoint
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sqlclient/#terminating-a-job
>
> Best,
> Shammon FY
>
>
> On Tue, Jul 18, 2023 at 9:56 AM Xiaolong Wang 
> wrote:
>
>> Hi, Shammon,
>>
>> I know that the job manager can auto-recover via HA configurations, but
>> what if I want to upgrade the running Flink SQL submitted by the Flink SQL
>> gateway ?
>>
>> In normal cases, I can use the
>>
>>> ./flink run application -s ${SAVEPOINT_PATH} local://${FLINK_JOB_JAR}
>>
>> to resume a Flink job from a savepoint/checkpoint. The question is, how
>> to do so with Flink sql gateway ?  What should I fill in the
>> ${FLINK_JOB_JAR} field ?
>>
>> Thanks in advanced.
>>
>> On Mon, Jul 17, 2023 at 9:14 AM Shammon FY  wrote:
>>
>>> Hi Xiaolong,
>>>
>>> When a streaming job is submitted via Sql-Gateway, its lifecycle is no
>>> longer related to Sql Gateway.
>>>
>>> Returning to the issue of job recovery, I think if your job cluster is
>>> configured with HA, jobmanager will recover running streaming jobs from
>>> their checkpoints after a failover occurs.
>>>
>>> Best,
>>> Shammon FY
>>>
>>>
>>> On Thu, Jul 13, 2023 at 10:22 AM Xiaolong Wang <
>>> xiaolong.w...@smartnews.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm currently working on providing a SQL gateway to submit both
>>>> streaming and batch queries.
>>>>
>>>> My question is, if a streaming SQL is submitted and then the jobmanager
>>>> crashes, is it possible to resume the streaming SQL from the latest
>>>> checkpoint with the SQL gateway ?
>>>>
>>>>
>>>>
>>>


How to resume a job from checkpoint with the SQL gateway.

2023-07-12 Thread Xiaolong Wang
Hi,

I'm currently working on providing a SQL gateway to submit both streaming
and batch queries.

My question is, if a streaming SQL is submitted and then the jobmanager
crashes, is it possible to resume the streaming SQL from the latest
checkpoint with the SQL gateway ?


SQL-gateway Failed to Run

2023-07-03 Thread Xiaolong Wang
Hi,
I've tested the Flink SQL-gateway to run some simple Hive queries but met
some exceptions.


Environment Description:
Run on : Kubernetes
Deployment Mode: Session Mode (created by a flink-kubernetes-operator)
Steps to run:
1. Apply a `flinkdeployment` of flink session cluster to flink operator
```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-session-cluster-example
  namespace: xxx
spec:
  image: xxx/flink:1.17-sql-gateway-dev
  flinkVersion: v1_17
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
pipeline.max-parallelism: "1000"
state.backend.type: rocksdb
state.backend.incremental: "true"
state.checkpoints.dir: xxx
execution.checkpointing.interval: 1m
execution.checkpointing.timeout: 30m
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: xxx
akka.framesize: 20971520b
execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION
taskmanager.memory.managed.fraction: "0.2"
kubernetes.hadoop.conf.config-map.name: xxx
  serviceAccount: default
  podTemplate:
apiVersion: v1
kind: Pod
metadata:
  name: pod-template
spec:
  serviceAccount: default
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "4096m"
  cpu: 1
```
This image has been built with a `hadoop dependency` , an existing `hadoop
configmap`.

2. Login to the job-manager pod and run the followings
`./bin/sql-gateway.sh start-foreground
-Dsql-gateway.endpoint.type=hiveserver2
-Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf`

3. Start a beeline and connect to the SQL gateway then run a simple Hive
query
`select count(1) from simple_demo_output where dt = '2021-08-14';`

4.The SQL gateway goes wrong with the following logs:
```

2023-07-03 06:27:11,078 INFO
org.apache.flink.client.program.rest.RestClusterClient
  [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5).

2023-07-03 06:27:15,092 INFO
org.apache.flink.client.program.rest.RestClusterClient
  [] - Successfully submitted job 'collect'
(4c99c40392cb935d3df94891655d2ce5) to '
http://flink-session-cluster-example-rest.realtime-streaming:8081'.

2023-07-03 06:27:15,879 ERROR
org.apache.flink.table.gateway.service.operation.OperationManager [] -
Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee.

org.apache.flink.table.api.TableException: Failed to execute sql

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
~[flink-table-api-java-uber-1.17.1.jar:1.17.1]

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
~[flink-table-api-java-uber-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]

at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]

at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]

at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]

at java.lang.Thread.run(Unknown Source) [?:?]

Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'collect'.

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
~[flink-dist-1.17.1.jar:1.17.1]

at
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
~[flink-table-planner_2.12-1.17.1.jar:1.17.1]

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:955)
~[flink-table-api-java-uber-1.17.1.jar:1.17.1]

... 13 more

Caused by: java.lang.RuntimeException: Error while waiting for job to be
initialized

at
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:151)
~[flink-dist-1.17.1.jar:1.17.1]

at

Flink on Native K8s jobs turn in to `SUSPENDED` status unexpectedly.

2022-05-11 Thread Xiaolong Wang
Hello,

Recently our Flink jobs on Native K8s encountered failing in the
`SUSPENDED` status and got restarted for no reason.

Flink version: 1.13.2

Logs:
```
2022-05-11 05:01:41

2022-05-10 21:01:41,771 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 17921 (type=CHECKPOINT) @ 1652216501302 for job
.\n
2022-05-11 05:01:43

2022-05-10 21:01:42,860 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 17921 for job  (11840 bytes in
866 ms).\n
2022-05-11 05:04:34

2022-05-10 21:04:34,550 INFO
org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Creating a
new watch on TaskManager pods.\n
2022-05-11 05:06:43

2022-05-10 21:06:43,512 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 17922 (type=CHECKPOINT) @ 1652216802860 for job
.\n
2022-05-11 05:06:44

2022-05-10 21:06:44,441 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 17922 for job  (11840 bytes in
977 ms).\n
2022-05-11 05:11:45

2022-05-10 21:11:44,826 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 17923 (type=CHECKPOINT) @ 1652217104441 for job
.\n
2022-05-11 05:11:45

2022-05-10 21:11:45,537 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed
checkpoint 17923 for job  (11840 bytes in
646 ms).\n
2022-05-11 05:12:36

2022-05-10 21:12:36,746 INFO
org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess
[] - Stopping SessionDispatcherLeaderProcess.\n
2022-05-11 05:12:36

2022-05-10 21:12:36,747 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping
dispatcher akka.tcp://flink@10.2.70.34:6123/user/rpc/dispatcher_1.\n
2022-05-11 05:12:36

2022-05-10 21:12:36,747 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Stopping all
currently running jobs of dispatcher akka.tcp://
flink@10.2.70.34:6123/user/rpc/dispatcher_1.\n
2022-05-11 05:12:36

2022-05-10 21:12:36,749 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Stopping the JobMaster for job
insert-into_default_catalog.default_database.sn_fstore_location_cluster_raw_scylla_sink().\n
2022-05-11 05:12:36

2022-05-10 21:12:36,752 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
 reached terminal state SUSPENDED.\n
2022-05-11 05:12:36

2022-05-10 21:12:36,752 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job
insert-xxx_sink () switched from state
RUNNING to SUSPENDED.\n
2022-05-11 05:12:36

org.apache.flink.util.FlinkException: Scheduler is being stopped.\n
2022-05-11 05:12:36

at
org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607)
~[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at
org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962)
~[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at
org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926)
~[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398)
~[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214)
~[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563)
~[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186)
~[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.13.2.jar:1.13.2]\n
2022-05-11 05:12:36

at 

How to mount PVC volumes using Flink Native Kubernetes ?

2021-09-10 Thread Xiaolong Wang
Hi,

   I'm facing a tough question. I want to start a Flink Native Kubernetes
job with each of the task manager pod mounted with an aws-ebs PVC.

  The first thought is to use the pod-template file to do this, but it soon
went to a dead end. Since the pod-template on each of the task manager pod
is the same, how can I mount different PVCs ?

   This issue is quite puzzling, will you please help me ?

Thanks in advance !


Flink failed to resume from checkpoint stored on S3

2020-07-22 Thread Xiaolong Wang
Deare community,
One of my Flink job failed yesterday, and when I tried to resume from
the latest checkpoint, following exceptions happen:


```
Log Type: jobmanager.err

Log Upload Time: Wed Jul 22 09:04:24 + 2020

Log Length: 506

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/mnt/yarn/usercache/ec2-user/appcache/application_1591011685424_1054/filecache/10/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

Log Type: jobmanager.log

Log Upload Time: Wed Jul 22 09:04:24 + 2020

Log Length: 65177

Showing 4096 bytes of 65177 total. Click here for the full log.

SchedulerBase.(SchedulerBase.java:215)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:120)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
... 10 more
2020-07-22 09:04:22,766 ERROR
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  -
Unhandled exception.
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:152)
at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.io.FileNotFoundException: Cannot find meta data file
'_metadata' in directory
's3:///flink/checkpoint_dir/65786c3307a10e79a52b4de478cfe996/chk-7853'.
Please try to load the checkpoint/savepoint directly from the metadata file
instead of the directory.
at
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpointPointer(AbstractFsCheckpointStorage.java:258)
at
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage.resolveCheckpoint(AbstractFsCheckpointStorage.java:110)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1152)
at
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:306)
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:239)
at
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:215)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:120)
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:266)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:146)
... 10 more
2020-07-22 09:04:22,771 INFO  org.apache.flink.runtime.blob.BlobServer
 - Stopped BLOB server at 0.0.0.0:34683

Log Type: jobmanager.out

Log Upload Time: Wed Jul 22 09:04:24 + 2020

Log Length: 0
```


How To subscribe a Kinesis Stream using enhance fanout?

2020-05-12 Thread Xiaolong Wang
Hello Flink Community!

  I'm currently coding on a project relying on AWS Kinesis. With the
provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the
message.

 But as the main stream is used among several other teams, I was
required to use the enhance fanout of Kinesis. I checked the connector code
and found no implementations.

 Has this issue occurred to anyone before?

Thanks for your help.