flink on native k8s里如何使用flink sql gateway

2023-07-03 Thread chaojianok
大家好,请教个问题。


用native kubernetes方式在k8s集群上部署好了flink,现在需要在这个flink集群里使用flink sql 
gateway,大家有什么好的方案吗?
目前的做法是,进入pod里启动sql gateway,然后在k8s创建flink-sql-gateway 
service,这样就可以通过这个service来访问sql 
gateway了,但是这个方法有个问题,部署过程中必需进入pod启服务,这是不利于自动化部署的,具体的操作命令如下,大家帮忙看看有没有好的解决方案来避免这个问题。


1、创建flink集群
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort


2、进入pod通过 ./bin/sql-gateway.sh start 
-Dsql-gateway.endpoint.rest.address=localhost 启动sql gateway服务,退出pod


3、创建flink-sql-gateway service
kubectl expose deployment flink-cluster --type=NodePort --port=8083 
--name=flink-sql-gateway -n flink

Re: Query around Rocksdb

2023-07-03 Thread Yanfei Lei
Hi neha,

Due to the limitation of RocksDB, we cannot create a
strict-capacity-limit LRUCache which shared among rocksDB instance(s),
FLINK-15532[1] is created to track this.
BTW, have you set TTL for this job[2],  TTL can help control the state size.

[1] https://issues.apache.org/jira/browse/FLINK-15532
[2]https://issues.apache.org/jira/browse/FLINK-31089

Shammon FY  于2023年7月4日周二 09:08写道:
>
> Hi neha,
>
> Which flink version are you using? We have also encountered the issue of 
> continuous growth of off-heap memory in the TM of the session cluster before, 
> the reason is that the memory fragments cannot be reused like issue [1]. You 
> can check the memory allocator and try to use jemalloc instead refer to doc 
> [2] and [3].
>
> [1] https://issues.apache.org/jira/browse/FLINK-19125
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
> [3] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator
>
> Best,
> Shammon FY
>
> On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:
>>
>> Hello,
>>
>> I am trying to debug the unbounded memory consumption by the Flink process. 
>> The heap size of the process remains the same. The size of the RSS of the 
>> process keeps on increasing. I suspect it might be because of RocksDB.
>>
>> we have the default value for state.backend.rocksdb.memory.managed as true. 
>> Can anyone confirm that this config will Rockdb be able to take the 
>> unbounded native memory?
>>
>> If yes, what metrics can I check to confirm the issue? Any help would be 
>> appreciated.



-- 
Best,
Yanfei


Re: Checkpointed data size is zero

2023-07-03 Thread Yanfei Lei
Hi Kamal,

Is the Full Checkpoint Data Size[1] also zero? If not, it may be that
no data is processed during this checkpoint.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/monitoring/checkpoint_monitoring/

Shammon FY  于2023年7月4日周二 09:10写道:

>
> Hi Kamal,
>
> You can check whether flink job has readed data from source in flink web ui, 
> it will show total record count and size for each operator.
>
> Best,
> Shammon FY
>
> On Sat, Jul 1, 2023 at 4:53 PM Kamal Mittal via user  
> wrote:
>>
>> Hello Community,
>>
>>
>>
>> I have a requirement to read data coming over TCP socket stream and for the 
>> same written one custom source function reading data by TCP socket.
>>
>>
>>
>> Job is running successfully but in flink dashboard checkpoint overview, 
>> checkpointed data size is 0.
>>
>>
>>
>> Can you please help if there is anything need to check or some 
>> issue/limitation due to TCP streaming?
>>
>>
>>
>> Rgds,
>>
>> Kamal



--
Best,
Yanfei


Re: PartitionNotFoundException循环重启

2023-07-03 Thread Shammon FY
Hi,

PartitionNotFoundException异常原因通常是下游task向上游task发送partition
request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。

Best,
Shammon FY

On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

>
> 异常日志内容
>
> 2023-07-03 20:30:15,164 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink:
> Sink 3 (2/45)
> (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> 3093 not found.
> at 
> org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
>
>
>
> 发件人: zhan...@eastcom-sw.com
> 发送时间: 2023-07-04 09:25
> 收件人: user-zh
> 主题: PartitionNotFoundException循环重启
> hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> PartitionNotFoundException 的异常,然后不断的循环重启
>
> 在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> 的异常后,不断的重启
> taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> taskmanager.network.max-num-tcp-connections: 16
>
> 当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
>
>


CFP for the 2nd Performance Engineering track at Community over Code NA 2023

2023-07-03 Thread Brebner, Paul
Hi Apache Flink people - There are only 10 days left to submit a talk proposal 
(title and abstract only) for Community over Code NA 2023! The 2nd Performance 
Engineering track is on this year so any Apache project-related performance and 
scalability talks are welcome, here's the track CFP for more ideas and links 
including the official Apache submission page: 
https://www.linkedin.com/pulse/call-papers-2nd-performance-engineering-track-over-code-brebner/
  - Paul Brebner and Roger Abelenda



Re: SQL-gateway Failed to Run

2023-07-03 Thread Shammon FY
Hi Xiaolong,

>From the exception it seems that the flink session cluster is not
running properly. Can you visit the flink web ui and everything is ok?

Best,
Shammon FY

On Mon, Jul 3, 2023 at 2:43 PM Xiaolong Wang 
wrote:

> Hi,
> I've tested the Flink SQL-gateway to run some simple Hive queries but met
> some exceptions.
>
>
> Environment Description:
> Run on : Kubernetes
> Deployment Mode: Session Mode (created by a flink-kubernetes-operator)
> Steps to run:
> 1. Apply a `flinkdeployment` of flink session cluster to flink operator
> ```
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: flink-session-cluster-example
>   namespace: xxx
> spec:
>   image: xxx/flink:1.17-sql-gateway-dev
>   flinkVersion: v1_17
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "2"
> pipeline.max-parallelism: "1000"
> state.backend.type: rocksdb
> state.backend.incremental: "true"
> state.checkpoints.dir: xxx
> execution.checkpointing.interval: 1m
> execution.checkpointing.timeout: 30m
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> high-availability.storageDir: xxx
> akka.framesize: 20971520b
> execution.checkpointing.externalized-checkpoint-retention:
> RETAIN_ON_CANCELLATION
> taskmanager.memory.managed.fraction: "0.2"
> kubernetes.hadoop.conf.config-map.name: xxx
>   serviceAccount: default
>   podTemplate:
> apiVersion: v1
> kind: Pod
> metadata:
>   name: pod-template
> spec:
>   serviceAccount: default
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "4096m"
>   cpu: 1
> ```
> This image has been built with a `hadoop dependency` , an existing `hadoop
> configmap`.
>
> 2. Login to the job-manager pod and run the followings
> `./bin/sql-gateway.sh start-foreground
> -Dsql-gateway.endpoint.type=hiveserver2
> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf`
>
> 3. Start a beeline and connect to the SQL gateway then run a simple Hive
> query
> `select count(1) from simple_demo_output where dt = '2021-08-14';`
>
> 4.The SQL gateway goes wrong with the following logs:
> ```
>
> 2023-07-03 06:27:11,078 INFO  
> org.apache.flink.client.program.rest.RestClusterClient
>   [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5).
>
> 2023-07-03 06:27:15,092 INFO  
> org.apache.flink.client.program.rest.RestClusterClient
>   [] - Successfully submitted job 'collect'
> (4c99c40392cb935d3df94891655d2ce5) to '
> http://flink-session-cluster-example-rest.realtime-streaming:8081'.
>
> 2023-07-03 06:27:15,879 ERROR
> org.apache.flink.table.gateway.service.operation.OperationManager [] -
> Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee.
>
> org.apache.flink.table.api.TableException: Failed to execute sql
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
> ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
> ~[flink-sql-gateway-1.17.1.jar:1.17.1]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> [?:?]
>
> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> [?:?]
>
> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
>
> at java.lang.Thread.run(Unknown Source) [?:?]
>
> Caused by: org.apache.flink.util.FlinkException: Failed to execute job
> 'collect'.
>
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
> ~[flink-dist-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
> 

Re: Checkpointed data size is zero

2023-07-03 Thread Shammon FY
Hi Kamal,

You can check whether flink job has readed data from source in flink web
ui, it will show total record count and size for each operator.

Best,
Shammon FY

On Sat, Jul 1, 2023 at 4:53 PM Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> I have a requirement to read data coming over TCP socket stream and for
> the same written one custom source function reading data by TCP socket.
>
>
>
> Job is running successfully but in flink dashboard checkpoint overview,
> checkpointed data size is 0.
>
>
>
> Can you please help if there is anything need to check or some
> issue/limitation due to TCP streaming?
>
>
>
> Rgds,
>
> Kamal
>


Re: Query around Rocksdb

2023-07-03 Thread Shammon FY
Hi neha,

Which flink version are you using? We have also encountered the issue of
continuous growth of off-heap memory in the TM of the session cluster
before, the reason is that the memory fragments cannot be reused like issue
[1]. You can check the memory allocator and try to use jemalloc instead
refer to doc [2] and [3].

[1] https://issues.apache.org/jira/browse/FLINK-19125
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.12/#deployment
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#switching-the-memory-allocator

Best,
Shammon FY

On Sat, Jul 1, 2023 at 2:58 PM neha goyal  wrote:

> Hello,
>
> I am trying to debug the unbounded memory consumption by the Flink
> process. The heap size of the process remains the same. The size of the RSS
> of the process keeps on increasing. I suspect it might be because of
> RocksDB.
>
> we have the default value for state.backend.rocksdb.memory.managed as
> true. Can anyone confirm that this config will Rockdb be able to take the
> unbounded native memory?
>
> If yes, what metrics can I check to confirm the issue? Any help would be
> appreciated.
>


Re: Difference between different values for starting offset

2023-07-03 Thread Mason Chen
Hi Oscar,

You are correct about the OffsetInitializer being only effective when there
is no Flink state--in addition, if you have partition discovery on, this
initializer will be reused for the new partitions (i.e. splits) discovered.
Assuming the job is continuing from the offset in Flink state, there is no
difference between the two strategies. This is because the
`auto.offset.reset` maps to the `OffsetResetStrategy` and
OffsetInitializer.earliest uses `earliest` too.

Best,
Mason

On Mon, Jul 3, 2023 at 6:56 AM Oscar Perez via user 
wrote:

> Hei,
>
> Looking at the flink documentation for kafkasource I see the following
> values for starting offset:
>
> OffsetInitializer.earliest
> OffsetInitializer.latest
> OffsetInitializer.commitedOffset(OffsetResetStrategy.EARLIEST)
>
> From what I understand OffsetInitializer.earliest uses earliest offset the
> first time but later deployments will use the committed offset in the flink
> state to resume from there. If that is the case what is the difference
> between OffsetInitializer.earliest and
> commitedOffset(OffsetResetStrategy.EARLIEST) if both continue from the
> committed offset after redeployment?
>
> Thanks!
> Oscar
>


Changing vertex parallelism

2023-07-03 Thread Nikolaos Paraskakis
Is there any way changing job vertex parallelism during runtime (downtime 
included)? For example, via REST API?

Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread Jark Wu
Congrats everyone!

Best,
Jark

> 2023年7月3日 22:37,Yuval Itzchakov  写道:
> 
> Congrats team!
> 
> On Mon, Jul 3, 2023, 17:28 Jing Ge via user  > wrote:
>> Congratulations!
>> 
>> Best regards,
>> Jing
>> 
>> 
>> On Mon, Jul 3, 2023 at 3:21 PM yuxia > > wrote:
>>> Congratulations!
>>> 
>>> Best regards,
>>> Yuxia
>>> 
>>> 发件人: "Pushpa Ramakrishnan" >> >
>>> 收件人: "Xintong Song" mailto:tonysong...@gmail.com>>
>>> 抄送: "dev" mailto:d...@flink.apache.org>>, "User" 
>>> mailto:user@flink.apache.org>>
>>> 发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30
>>> 主题: Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award
>>> 
>>> Congratulations \uD83E\uDD73 
>>> 
>>> On 03-Jul-2023, at 3:30 PM, Xintong Song >> > wrote:
>>> 
>>> 
>>> Dear Community,
>>> 
>>> I'm pleased to share this good news with everyone. As some of you may have 
>>> already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1].
>>> 
>>> "Apache Flink greatly expanded the use of stream data-processing." -- 
>>> SIGMOD Awards Committee
>>> 
>>> SIGMOD is one of the most influential data management research conferences 
>>> in the world. The Systems Award is awarded to an individual or set of 
>>> individuals to recognize the development of a software or hardware system 
>>> whose technical contributions have had significant impact on the theory or 
>>> practice of large-scale data management systems. Winning of the award 
>>> indicates the high recognition of Flink's technological advancement and 
>>> industry influence from academia.
>>> 
>>> As an open-source project, Flink wouldn't have come this far without the 
>>> wide, active and supportive community behind it. Kudos to all of us who 
>>> helped make this happen, including the over 1,400 contributors and many 
>>> others who contributed in ways beyond code.
>>> 
>>> Best,
>>> Xintong (on behalf of the Flink PMC)
>>> 
>>> [1] https://sigmod.org/2023-sigmod-systems-award/
>>> 



Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread Yuval Itzchakov
Congrats team!

On Mon, Jul 3, 2023, 17:28 Jing Ge via user  wrote:

> Congratulations!
>
> Best regards,
> Jing
>
>
> On Mon, Jul 3, 2023 at 3:21 PM yuxia  wrote:
>
>> Congratulations!
>>
>> Best regards,
>> Yuxia
>>
>> --
>> *发件人: *"Pushpa Ramakrishnan" 
>> *收件人: *"Xintong Song" 
>> *抄送: *"dev" , "User" 
>> *发送时间: *星期一, 2023年 7 月 03日 下午 8:36:30
>> *主题: *Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award
>>
>> Congratulations \uD83E\uDD73
>>
>> On 03-Jul-2023, at 3:30 PM, Xintong Song  wrote:
>>
>> 
>> Dear Community,
>>
>> I'm pleased to share this good news with everyone. As some of you may
>> have already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1].
>>
>> "Apache Flink greatly expanded the use of stream data-processing." --
>> SIGMOD Awards Committee
>>
>> SIGMOD is one of the most influential data management research
>> conferences in the world. The Systems Award is awarded to an individual or
>> set of individuals to recognize the development of a software or hardware
>> system whose technical contributions have had significant impact on the
>> theory or practice of large-scale data management systems. Winning of the
>> award indicates the high recognition of Flink's technological advancement
>> and industry influence from academia.
>>
>> As an open-source project, Flink wouldn't have come this far without the
>> wide, active and supportive community behind it. Kudos to all of us who
>> helped make this happen, including the over 1,400 contributors and many
>> others who contributed in ways beyond code.
>>
>> Best,
>>
>> Xintong (on behalf of the Flink PMC)
>>
>>
>> [1] https://sigmod.org/2023-sigmod-systems-award/
>>
>>
>>


Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread Jing Ge via user
Congratulations!

Best regards,
Jing


On Mon, Jul 3, 2023 at 3:21 PM yuxia  wrote:

> Congratulations!
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Pushpa Ramakrishnan" 
> *收件人: *"Xintong Song" 
> *抄送: *"dev" , "User" 
> *发送时间: *星期一, 2023年 7 月 03日 下午 8:36:30
> *主题: *Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award
>
> Congratulations \uD83E\uDD73
>
> On 03-Jul-2023, at 3:30 PM, Xintong Song  wrote:
>
> 
> Dear Community,
>
> I'm pleased to share this good news with everyone. As some of you may have
> already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1].
>
> "Apache Flink greatly expanded the use of stream data-processing." --
> SIGMOD Awards Committee
>
> SIGMOD is one of the most influential data management research conferences
> in the world. The Systems Award is awarded to an individual or set of
> individuals to recognize the development of a software or hardware system
> whose technical contributions have had significant impact on the theory or
> practice of large-scale data management systems. Winning of the award
> indicates the high recognition of Flink's technological advancement and
> industry influence from academia.
>
> As an open-source project, Flink wouldn't have come this far without the
> wide, active and supportive community behind it. Kudos to all of us who
> helped make this happen, including the over 1,400 contributors and many
> others who contributed in ways beyond code.
>
> Best,
>
> Xintong (on behalf of the Flink PMC)
>
>
> [1] https://sigmod.org/2023-sigmod-systems-award/
>
>
>


Difference between different values for starting offset

2023-07-03 Thread Oscar Perez via user
Hei,

Looking at the flink documentation for kafkasource I see the following
values for starting offset:

OffsetInitializer.earliest
OffsetInitializer.latest
OffsetInitializer.commitedOffset(OffsetResetStrategy.EARLIEST)

>From what I understand OffsetInitializer.earliest uses earliest offset the
first time but later deployments will use the committed offset in the flink
state to resume from there. If that is the case what is the difference
between OffsetInitializer.earliest and
commitedOffset(OffsetResetStrategy.EARLIEST) if both continue from the
committed offset after redeployment?

Thanks!
Oscar


Using HybridSource

2023-07-03 Thread Oscar Perez via user
Hei, We want to bootstrap some data from a CSV file before reading from a
kafka topic that has a retention period of 7 days.

We believe the best tool for that would be the HybridSource but the problem
we are facing is that both datasources are of different nature. The
KafkaSource returns a protobuf event while the CSV is a POJO with just 3
fields.

We could hack the kafkasource implementation and then in the
valuedeserializer do the mapping from protobuf to the CSV POJO but that
seems rather hackish. Is there a way more elegant to unify both datatypes
from both sources using Hybrid Source?

thanks
Oscar


Re: Identifying a flink dashboard

2023-07-03 Thread Mike Phillips

Thanks Alex,

Whilst there are good solutions it looks like I am not going to be able 
to do anything useful.
The port changes due to a whole bunch of useful script that create port 
forwards for various things starting from 3 (that just a number 
plucked out of the air)

Services like mongo psql other services etc.
We would need to keep some sort of map of the flink dashboard and the 
port number, not impossible but a bit of a admin nightmare when 
adding/removing dashboards.


I will think some more

On 01/07/2023 00:34, Alexander Fedulov wrote:
> 3 - Not possible, the dashboards are not accessible via the 
internet, so we use kube and port forward, URL looks like 
http://wobbegong:3/ the port changes
Well, if the port changes, you can differentiate between the bookmarks 
- just need to consistently use the same port when port forwarding to 
the same cluster.


On Thu, 29 Jun 2023 at 08:51, Schwalbe Matthias 
 wrote:


Hi Mike,

Let me sketch it:

  * The trick I use (no idea if it is wise or not ) is to have
nginx-ingress set up and then specify a service selecting the
nginx…controller pods [1]
  * You don’t need to bind to the node address (see externalIPs),
you could much the same port-forward this service, but
the ingresses that specify the nginx-ingress, all relay over
that same service, using a different https path respectively
  * I’ll give an example configuration for
flink-kubernetes-operator FlinkDeployment [2]
  o template: is patched with the namespace and job name
  o unfortunately, annotations: does not support templating
(yet?),
  o i.e. you need to manually replace the path which must be
the same as what comes out of template:
  o put in the  whatever you like (that was your
original question )
  * I work on a local VM with microk8s, so specifying that as
externalIPs allows me to access it, however I also need to
register this IP as local.ingress in my hosts file, and accept
the certificate in the browser …
  * In your case you could either expose that service with a port
forward and also get the certificate and DNS business solved
  * This is the result on my machine:

[1] service-exposing-nginx-ingress-on-node.yaml :

apiVersion: v1

kind: Service

metadata:

  name: nginx-ingress-microk8s-service

  namespace: ingress

  labels:

    app: nginx-ingress

spec:

ports:

    - port: 8095

  targetPort: http

  protocol: TCP

name: http

    - port: 8444

  targetPort: https

  protocol: TCP

  name: https

  selector:

    name: nginx-ingress-microk8s

externalIPs:

    - xxx.xxx.xxx.xxx

[2] basic.ingress.yaml :




#  Licensed to the Apache Software Foundation (ASF) under one

#  or more contributor license agreements.  See the NOTICE file

#  distributed with this work for additional information

#  regarding copyright ownership.  The ASF licenses this file

#  to you under the Apache License, Version 2.0 (the

#  "License"); you may not use this file except in compliance

#  with the License.  You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

#  Unless required by applicable law or agreed to in writing, software

#  distributed under the License is distributed on an "AS IS" BASIS,

#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.

#  See the License for the specific language governing permissions and

# limitations under the License.




apiVersion: flink.apache.org/v1beta1 

kind: FlinkDeployment

metadata:

  name: basic-ingress

  namespace: flink

spec:

  image: flink:1.16

  flinkVersion: v1_16

  ingress:

    template: "ingress.local/{{namespace}}/{{name}}(/|$)(.*)"

    className: "nginx"

    annotations:

nginx.ingress.kubernetes.io/use-regex
: "true"

nginx.ingress.kubernetes.io/rewrite-target
: "/$2"

nginx.ingress.kubernetes.io/configuration-snippet
: |

    proxy_set_header Accept-Encoding "";

    sub_filter_last_modified off;

    sub_filter '' '';

    sub_filter 'Apache Flink Web Dashboard'
'flink: basic-ingress Dashboard';

  flinkConfiguration:

    taskmanager.numberOfTaskSlots: "2"

  serviceAccount: flink

  jobManager:

    resource:

  memory: 

Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread yuxia
Congratulations! 

Best regards, 
Yuxia 


发件人: "Pushpa Ramakrishnan"  
收件人: "Xintong Song"  
抄送: "dev" , "User"  
发送时间: 星期一, 2023年 7 月 03日 下午 8:36:30 
主题: Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award 

Congratulations \uD83E\uDD73 



On 03-Jul-2023, at 3:30 PM, Xintong Song  wrote: 





BQ_BEGIN

Dear Community, 

I'm pleased to share this good news with everyone. As some of you may have 
already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1]. 

"Apache Flink greatly expanded the use of stream data-processing." -- SIGMOD 
Awards Committee 

SIGMOD is one of the most influential data management research conferences in 
the world. The Systems Award is awarded to an individual or set of individuals 
to recognize the development of a software or hardware system whose technical 
contributions have had significant impact on the theory or practice of 
large-scale data management systems. Winning of the award indicates the high 
recognition of Flink's technological advancement and industry influence from 
academia. 

As an open-source project, Flink wouldn't have come this far without the wide, 
active and supportive community behind it. Kudos to all of us who helped make 
this happen, including the over 1,400 contributors and many others who 
contributed in ways beyond code. 



Best, 

Xintong (on behalf of the Flink PMC) 




[1] [ https://sigmod.org/2023-sigmod-systems-award/ | 
https://sigmod.org/2023-sigmod-systems-award/ ] 



BQ_END



Re: [ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread Pushpa Ramakrishnan

Congratulations 拾 

> On 03-Jul-2023, at 3:30 PM, Xintong Song  wrote:
> 
> 
> Dear Community,
> 
> I'm pleased to share this good news with everyone. As some of you may have 
> already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1].
> 
> "Apache Flink greatly expanded the use of stream data-processing." -- SIGMOD 
> Awards Committee
> 
> SIGMOD is one of the most influential data management research conferences in 
> the world. The Systems Award is awarded to an individual or set of 
> individuals to recognize the development of a software or hardware system 
> whose technical contributions have had significant impact on the theory or 
> practice of large-scale data management systems. Winning of the award 
> indicates the high recognition of Flink's technological advancement and 
> industry influence from academia.
> 
> As an open-source project, Flink wouldn't have come this far without the 
> wide, active and supportive community behind it. Kudos to all of us who 
> helped make this happen, including the over 1,400 contributors and many 
> others who contributed in ways beyond code.
> 
> Best,
> Xintong (on behalf of the Flink PMC)
> 
> [1] https://sigmod.org/2023-sigmod-systems-award/


the new state serializer can not be incompatible

2023-07-03 Thread 湘晗刚
Hi team : restore.from savepoint i meet:
the new state serializer can not be incompatible



Who can help me?


Thanks in advance
Kobe24

退订

2023-07-03 Thread 周勃
退订

[ANNOUNCE] Apache Flink has won the 2023 SIGMOD Systems Award

2023-07-03 Thread Xintong Song
Dear Community,

I'm pleased to share this good news with everyone. As some of you may have
already heard, Apache Flink has won the 2023 SIGMOD Systems Award [1].

"Apache Flink greatly expanded the use of stream data-processing." --
SIGMOD Awards Committee

SIGMOD is one of the most influential data management research conferences
in the world. The Systems Award is awarded to an individual or set of
individuals to recognize the development of a software or hardware system
whose technical contributions have had significant impact on the theory or
practice of large-scale data management systems. Winning of the award
indicates the high recognition of Flink's technological advancement and
industry influence from academia.

As an open-source project, Flink wouldn't have come this far without the
wide, active and supportive community behind it. Kudos to all of us who
helped make this happen, including the over 1,400 contributors and many
others who contributed in ways beyond code.

Best,

Xintong (on behalf of the Flink PMC)


[1] https://sigmod.org/2023-sigmod-systems-award/


Flink 1.16 流表 join 的 FilterPushDown 及并行

2023-07-03 Thread Chai Kelun
有一张 kafka 流表 logClient(id int, name string, price double),一张实现了 
SupportsFilterPushDown 的 customConnector 维表 product(id int, name string, value 
double),实现了自定义函数 MyUDF(double d1, double d2) 用于自定义逻辑计算并支持该算子的下推。
在 Stream-Table Join 的场景下,下列 SQL 并没有将算子进行下推,而是通过 TableScan 将所有算子提到 Join 
节点进行计算,请问是否有什么选项可以开启下推?(类似与 nestedloop-join,计算推到 product 表数据源进行)
SELECT A.id, A.name, B.name FROM logClient AS A, product AS B WHERE 
MyUDF(B.value, A.price) < xxx;
另外,Kafka 和 customConnector 均支持并行,在 Join 计算时期望使用 BROADCAST 模式,将 product 表在 
logClient 流表的每个 partition 上进行计算,但似乎目前 Flink 流-表 Join 的 distribution 模式仅支持 
SINGLETON 和 HASH[KEY](StreamExecExchange.java Line106 的 switch 
case),后续社区是否会考虑支持更多的 distributionType?

非常感谢!

Re: Very long launch of the Flink application in BATCH mode

2023-07-03 Thread Brendan Cortez
Thanks guys. I tried the 1.17.1 version, but the problem still remains. It
seems to be a bug, I created an issue
https://issues.apache.org/jira/browse/FLINK-32513.

On Thu, 29 Jun 2023 at 10:57, Martijn Visser 
wrote:

> Hi Vladislav,
>
> I think it might be worthwhile to upgrade to Flink 1.17, given the
> improvements that have been made in Flink 1.16 and 1.17 on batch
> processing. See for example the release notes of 1.17, with an entire
> section on batch processing
> https://flink.apache.org/2023/03/23/announcing-the-release-of-apache-flink-1.17/#batch-processing
>
> Best regards,
>
> Martijn
>
> On Wed, Jun 28, 2023 at 7:27 PM Vladislav Keda 
> wrote:
>
>> Hi Shammon,
>>
>> When I set log.level=DEBUG I have no more logs except  *2023-06-21
>> 14:51:30,921 DEBUG
>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] -
>> Trigger heartbeat request.*
>>
>> Job freezes on stream graph generation. In STREAMING mode the job starts
>> fast without same problems.
>>
>> ср, 28 июн. 2023 г. в 06:44, Shammon FY :
>>
>>> Hi Brendan,
>>>
>>> I think you may need to confirm which stage the job is blocked, the
>>> client is submitting job or resourcemanage is scheduling job or tasks are
>>> launching in TM? May be you need provide more information to help us to
>>> figure the issue
>>>
>>> Best,
>>> Shammon FY
>>>
>>> On Tuesday, June 27, 2023, Weihua Hu  wrote:
>>>
 Hi, Brendan

 It looks like it's invoking your main method referring to the log. You
 can add more logs in the main method to figure out which part takes too
 long.

 Best,
 Weihua


 On Tue, Jun 27, 2023 at 5:06 AM Brendan Cortez <
 brendan.cortez...@gmail.com> wrote:

> No, I'm using a collection source + 20 same JDBC lookups + Kafka sink.
>
> On Mon, 26 Jun 2023 at 19:17, Yaroslav Tkachenko 
> wrote:
>
>> Hey Brendan,
>>
>> Do you use a file source by any chance?
>>
>> On Mon, Jun 26, 2023 at 4:31 AM Brendan Cortez <
>> brendan.cortez...@gmail.com> wrote:
>>
>>> Hi all!
>>>
>>> I'm trying to submit a Flink Job in Application Mode in the
>>> Kubernetes cluster.
>>>
>>> I see some problems when an application has a big number of
>>> operators (more than 20 same operators) - it freezes for ~6 minutes 
>>> after
>>> *2023-06-21 15:46:45,082 WARN
>>>  org.apache.flink.connector.kafka.sink.KafkaSinkBuilder   [] - 
>>> Property
>>> [transaction.timeout.ms ] not specified.
>>> Setting it to PT1H*
>>>  and until
>>>
>>> *2023-06-21 15:53:20,002 INFO
>>>  org.apache.flink.streaming.api.graph.StreamGraphGenerator[] - 
>>> Disabled
>>> Checkpointing. Checkpointing is not supported and not needed when 
>>> executing
>>> jobs in BATCH mode.*(logs in attachment)
>>>
>>> When I set log.level=DEBUG, I see only this message each 10 seconds:
>>> *2023-06-21 14:51:30,921 DEBUG
>>> org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager 
>>> [] -
>>> Trigger heartbeat request.*
>>>
>>> Please, could you help me understand the cause of this problem and
>>> how to fix it. I use the Flink 1.15.3 version.
>>>
>>> Thank you in advance!
>>>
>>> Best regards,
>>> Brendan Cortez.
>>>
>>


Re: Flink TCP socket custom source - savepoint cannot be taken

2023-07-03 Thread Martijn Visser
Hi Kamal,

There's no such limitation, so most likely this is related to the
implementation of your TCP source connector. Do keep in mind that just by
the nature of TCP, I doubt that you will have any guarantees when it comes
to this source. E.g. if you roll back to a savepoint of one day ago, how
will you be able to retrieve the data from your TCP source that's exactly
the same as it would be a day ago.

Best regards,

Martijn

On Sat, Jul 1, 2023 at 7:33 AM Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> I have a requirement to read data coming over TCP socket stream and for
> the same written one custom source function reading data by TCP socket.
>
>
>
> Job is running successfully but while trying to take a savepoint, error
> comes that savepoint cannot be taken.
>
>
>
> Is there any limitation that TCP streams state can’t be taken (by
> checkpoint or savepoint) and re-stored later?
>
>
>
> Rgds,
>
> Kamal
>


Query Regarding Optimisation of Timer Management in Flink CEP (Version 1.17)

2023-07-03 Thread Puneet Duggal
Hi,

I am currently working with Flink CEP version 1.17, and I am in the process of 
load testing for potential memory leaks related to checkpoint data. While 
analyzing the CepOperator code, I have come across a particular pattern 
regarding timer registration and event processing that I believe could 
potentially be optimized.
Here is the current behavior I observed:

For every incoming event, CepOperator registers a timer with a timestamp t1.
When the event timer is triggered at t1, CepOperator processes all the elements 
that have timestamps less than the current watermark.
This includes events with timestamps equal to t1 but also can include events 
with timestamps greater than t1 but still less than the current watermark.


While this behaviour is functional, it leads me to the following query: Since 
the events with timestamps greater than t1 and less than the watermark are 
already being processed by CepOperator, would it not be beneficial to delete 
the timers associated with these events? We are already removing timestamp 
mapping to events or events count via advanceTime method, why not remove timers 
associated with these timestamps as well.

By deleting these timers, I believe we could reduce memory consumption and CPU 
utilization, which could be especially beneficial for high-throughput scenarios 
or when working with constrained resources.

I would greatly appreciate your insights on this matter, and if this is 
something that could be considered for optimization within the Flink CEP 
library.



SQL-gateway Failed to Run

2023-07-03 Thread Xiaolong Wang
Hi,
I've tested the Flink SQL-gateway to run some simple Hive queries but met
some exceptions.


Environment Description:
Run on : Kubernetes
Deployment Mode: Session Mode (created by a flink-kubernetes-operator)
Steps to run:
1. Apply a `flinkdeployment` of flink session cluster to flink operator
```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-session-cluster-example
  namespace: xxx
spec:
  image: xxx/flink:1.17-sql-gateway-dev
  flinkVersion: v1_17
  flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
pipeline.max-parallelism: "1000"
state.backend.type: rocksdb
state.backend.incremental: "true"
state.checkpoints.dir: xxx
execution.checkpointing.interval: 1m
execution.checkpointing.timeout: 30m
high-availability:
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: xxx
akka.framesize: 20971520b
execution.checkpointing.externalized-checkpoint-retention:
RETAIN_ON_CANCELLATION
taskmanager.memory.managed.fraction: "0.2"
kubernetes.hadoop.conf.config-map.name: xxx
  serviceAccount: default
  podTemplate:
apiVersion: v1
kind: Pod
metadata:
  name: pod-template
spec:
  serviceAccount: default
  jobManager:
resource:
  memory: "2048m"
  cpu: 1
  taskManager:
resource:
  memory: "4096m"
  cpu: 1
```
This image has been built with a `hadoop dependency` , an existing `hadoop
configmap`.

2. Login to the job-manager pod and run the followings
`./bin/sql-gateway.sh start-foreground
-Dsql-gateway.endpoint.type=hiveserver2
-Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/opt/flink/hive-conf`

3. Start a beeline and connect to the SQL gateway then run a simple Hive
query
`select count(1) from simple_demo_output where dt = '2021-08-14';`

4.The SQL gateway goes wrong with the following logs:
```

2023-07-03 06:27:11,078 INFO
org.apache.flink.client.program.rest.RestClusterClient
  [] - Submitting job 'collect' (4c99c40392cb935d3df94891655d2ce5).

2023-07-03 06:27:15,092 INFO
org.apache.flink.client.program.rest.RestClusterClient
  [] - Successfully submitted job 'collect'
(4c99c40392cb935d3df94891655d2ce5) to '
http://flink-session-cluster-example-rest.realtime-streaming:8081'.

2023-07-03 06:27:15,879 ERROR
org.apache.flink.table.gateway.service.operation.OperationManager [] -
Failed to execute the operation 7613f663-8641-428c-b3d2-ec77a12fa6ee.

org.apache.flink.table.api.TableException: Failed to execute sql

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:974)
~[flink-table-api-java-uber-1.17.1.jar:1.17.1]

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1422)
~[flink-table-api-java-uber-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:431)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:195)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
~[flink-sql-gateway-1.17.1.jar:1.17.1]

at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]

at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]

at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]

at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]

at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]

at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]

at java.lang.Thread.run(Unknown Source) [?:?]

Caused by: org.apache.flink.util.FlinkException: Failed to execute job
'collect'.

at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2212)
~[flink-dist-1.17.1.jar:1.17.1]

at
org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
~[flink-table-planner_2.12-1.17.1.jar:1.17.1]

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:955)
~[flink-table-api-java-uber-1.17.1.jar:1.17.1]

... 13 more

Caused by: java.lang.RuntimeException: Error while waiting for job to be
initialized

at
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:151)
~[flink-dist-1.17.1.jar:1.17.1]

at