Re: Fail to deploy Flink on minikube

2020-09-02 Thread Yang Wang
Sorry i forget that the JobManager is binding its rpc address to
flink-jobmanager, not the ip address.
So you need to also update the jobmanager-session-deployment.yaml with
following changes.

...
  containers:
  - name: jobmanager
env:
- name: JM_IP
  valueFrom:
fieldRef:
  apiVersion: v1
  fieldPath: status.podIP
image: flink:1.11
args: ["jobmanager", "$(JM_IP)"]
...

After then the JobManager is binding the rpc address with its ip.

Best,
Yang


superainbower  于2020年9月3日周四 上午11:38写道:

> HI Yang,
> I update taskmanager-session-deployment.yaml like this:
>
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-taskmanager
> spec:
>   replicas: 1
>   selector:
> matchLabels:
>   app: flink
>   component: taskmanager
>   template:
> metadata:
>   labels:
> app: flink
> component: taskmanager
> spec:
>   containers:
>   - name: taskmanager
> image:
> registry.cn-hangzhou.aliyuncs.com/superainbower/flink:1.11.1
> args: ["taskmanager","-Djobmanager.rpc.address=172.18.0.5"]
> ports:
> - containerPort: 6122
>   name: rpc
> - containerPort: 6125
>   name: query-state
> livenessProbe:
>   tcpSocket:
> port: 6122
>   initialDelaySeconds: 30
>   periodSeconds: 60
> volumeMounts:
> - name: flink-config-volume
>   mountPath: /opt/flink/conf/
> securityContext:
>   runAsUser:   # refers to user _flink_ from official flink
> image, change if necessary
>   volumes:
>   - name: flink-config-volume
> configMap:
>   name: flink-config
>   items:
>   - key: flink-conf.yaml
> path: flink-conf.yaml
>   - key: log4j-console.properties
> path: log4j-console.properties
>   imagePullSecrets:
> - name: regcred
>
> And Delete the TaskManager pod and restart it , but the logs print this
>
> Could not resolve ResourceManager address akka.tcp://
> flink@172.18.0.5:6123/user/rpc/resourcemanager_*, retrying in 1 ms:
> Could not connect to rpc endpoint under address akka.tcp://
> flink@172.18.0.5:6123/user/rpc/resourcemanager_*
>
> It change flink-jobmanager to 172.18.0.5
> superainbower
> superainbo...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 09/3/2020 11:09,Yang Wang
>  wrote:
>
> I guess something is wrong with your kube proxy, which causes TaskManager
> could not connect to JobManager.
> You could verify this by directly using JobManager Pod ip instead of
> service name.
>
> Please do as follows.
> * Edit the TaskManager deployment(via kubectl edit flink-taskmanager) and
> update the args field to the following.
>args: ["taskmanager", "-Djobmanager.rpc.address=172.18.0.5"]Given
> that "172.18.0.5" is the JobManager pod ip.
> * Delete the current TaskManager pod and let restart again
> * Now check the TaskManager logs to check whether it could register
> successfully
>
>
>
> Best,
> Yang
>
> superainbower  于2020年9月3日周四 上午9:35写道:
>
>> Hi Till,
>> I find something may be helpful.
>> The kubernetes Dashboard show job-manager ip 172.18.0.5, task-manager ip
>> 172.18.0.6
>> When I run command 'kubectl exec -ti flink-taskmanager-74c68c6f48-jqpbn
>> -- /bin/bash’ && ‘ping 172.18.0.5’
>> I can get response
>> But when I ping flink-jobmanager ,there is no response
>>
>> superainbower
>> superainbo...@163.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 09/3/2020 09:03,superainbower
>>  wrote:
>>
>> Hi Till,
>> This is the taskManager log
>> As you see, the logs print  ‘line 92 -- Could not connect to
>> flink-jobmanager:6123’
>> then print ‘line 128 --Could not resolve ResourceManager address
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*,
>> retrying in 1 ms: Could not connect to rpc endpoint under address
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.’
>> And repeat print this
>>
>> A few minutes later, the taskmanger shut down and restart
>>
>> This is my yaml files, could u help me to confirm did I
>> omitted something? Thanks a lot!
>> ---
>> flink-configuration-configmap.yaml
>> apiVersion: v1
>> kind: ConfigMap
>> metadata:
>>   name: flink-config
>>   labels:
>> app: flink
>> data:
>>   flink-con

Re: Fail to deploy Flink on minikube

2020-09-02 Thread superainbower
HI Yang,
I update taskmanager-session-deployment.yaml like this:


apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 1
  selector:
matchLabels:
  app: flink
  component: taskmanager
  template:
metadata:
  labels:
app: flink
component: taskmanager
spec:
  containers:
  - name: taskmanager
image: registry.cn-hangzhou.aliyuncs.com/superainbower/flink:1.11.1
args: ["taskmanager","-Djobmanager.rpc.address=172.18.0.5"]
ports:
- containerPort: 6122
  name: rpc
- containerPort: 6125
  name: query-state
livenessProbe:
  tcpSocket:
port: 6122
  initialDelaySeconds: 30
  periodSeconds: 60
volumeMounts:
- name: flink-config-volume
  mountPath: /opt/flink/conf/
securityContext:
  runAsUser:   # refers to user _flink_ from official flink image, 
change if necessary
  volumes:
  - name: flink-config-volume
configMap:
  name: flink-config
  items:
  - key: flink-conf.yaml
path: flink-conf.yaml
  - key: log4j-console.properties
path: log4j-console.properties
  imagePullSecrets:
- name: regcred


And Delete the TaskManager pod and restart it , but the logs print this


Could not resolve ResourceManager address 
akka.tcp://flink@172.18.0.5:6123/user/rpc/resourcemanager_*, retrying in 1 
ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@172.18.0.5:6123/user/rpc/resourcemanager_*


It change flink-jobmanager to 172.18.0.5 
| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


On 09/3/2020 11:09,Yang Wang wrote:
I guess something is wrong with your kube proxy, which causes TaskManager could 
not connect to JobManager.
You could verify this by directly using JobManager Pod ip instead of service 
name.


Please do as follows.
* Edit the TaskManager deployment(via kubectl edit flink-taskmanager) and 
update the args field to the following.
   args: ["taskmanager", "-Djobmanager.rpc.address=172.18.0.5"]Given that 
"172.18.0.5" is the JobManager pod ip.
* Delete the current TaskManager pod and let restart again
* Now check the TaskManager logs to check whether it could register successfully






Best,
Yang


superainbower  于2020年9月3日周四 上午9:35写道:

Hi Till,
I find something may be helpful.
The kubernetes Dashboard show job-manager ip 172.18.0.5, task-manager ip 
172.18.0.6
When I run command 'kubectl exec -ti flink-taskmanager-74c68c6f48-jqpbn -- 
/bin/bash’ && ‘ping 172.18.0.5’ 
I can get response
But when I ping flink-jobmanager ,there is no response


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


On 09/3/2020 09:03,superainbower wrote:
Hi Till,
This is the taskManager log
As you see, the logs print  ‘line 92 -- Could not connect to 
flink-jobmanager:6123’
then print ‘line 128 --Could not resolve ResourceManager address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.’   And 
repeat print this


A few minutes later, the taskmanger shut down and restart


This is my yaml files, could u help me to confirm did I omitted something? 
Thanks a lot!
---
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
app: flink
data:
  flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 1024m
parallelism.default: 1
  log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
appender.rolling.policies.type = Polici

Re: FileSystemHaServices and BlobStore

2020-09-02 Thread Yang Wang
Hi Alexey,

Thanks for the feedback. You are right. StatefulSet + PersistentVolume +
FileSystemHaService could be another
bundle of services for Flink HA support on K8s. The user jars could be
built into the image or downloaded by init-container
or mount via the PV. So they do not need to be recovered from HA storage.
But i think the checkpoint path and counter
should be persisted so that we could recover from the latest checkpoint.


Best,
Yang

Alexey Trenikhun  于2020年9月2日周三 上午7:36写道:

> Judging from FLIP-19 (thank you Roman for the link), of 3 use cases (jars,
> RPC messages and log files) only jar files need HA guarantee, and in my
> particular case, job cluster with jar as part of image, it seems doesn't
> matter, I guess it explains why in my test I was able to recover from
> failure even VoidBlobStore. I also use StatefulSet instead of Deployment
>
> Thanks,
> Alexey
>
> --
> *From:* Yang Wang 
> *Sent:* Tuesday, September 1, 2020 1:58 AM
> *To:* dev 
> *Cc:* Alexey Trenikhun ; Flink User Mail List <
> user@flink.apache.org>
> *Subject:* Re: FileSystemHaServices and BlobStore
>
> Hi Alexey,
>
> Glad to hear that your are interested the K8s HA support.
>
> Roman's answer is just on point.
>
> "FileSystemBlobStore" is trying to store the user jars, job graph, etc. on
> the distributed storage(e.g. HDFS, S3, GFS). So when the
> JobManager failover, it could fetch the blob data from remote storage. It
> is very important for standalone and Yarn deployment since
> the local blob store is ephemeral and will be cleaned up after JobManager
> terminated.
>
> However, in your case, benefit from the K8s persistent volume, all the
> local blob data could be recovered after JobManager pod restarted.
> Then you could find that the jobs are recovered and keeps to running.
> Please also remember that the checkpoint meta and counter also
> need to be stored in local file. After then the Flink job could recover
> from the latest checkpoint successfully.
>
> > About the FileSystemHaService
> I am a little skeptical about this feature. Currently, we are using K8s
> deployment for the JobManager. And it is not always guaranteed only
> one JobManager is running. For example, the kubelet is down and never be
> pulled up again. I am trying to work on another ticket "native K8s HA"[1],
> in which we will get a fully functional HA service, including leader
> election/retrieval, jobgraph meta store, checkpoint meta store, running
> registry, etc.
> It could be used for standalone K8s and native K8s deployment.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-12884
>
> Best,
> Yang
>
>
> Khachatryan Roman  于2020年8月31日周一 下午8:52写道:
>
> + dev
>
> Blob store is used for jars, serialized job, and task information and logs.
> You can find some information at
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-19%3A+Improved+BLOB+storage+architecture
>
>
> I guess in your setup, Flink was able to pick up local files.
> HA setup presumes that Flink can survive the loss of that JM host and its
> local files.
>
> I'm not sure about K8s native setup - probably VoidBlobStore is enough if
> there is a persistent volume.
> But in the general case, FileSystemBlobStore should be used to store files
> on some DFS.
>
>
> Regards,
> Roman
>
>
> On Sat, Aug 29, 2020 at 6:42 PM Alexey Trenikhun  wrote:
>
> > Did test with streaming job and FileSystemHaService using VoidBlobStore
> > (no HA Blob), looks like job was able to recover from both JM restart and
> > TM restart. Any idea in what use cases HA Blob is needed?
> >
> > Thanks,
> > Alexey
> > --
> > *From:* Alexey Trenikhun 
> > *Sent:* Friday, August 28, 2020 11:31 AM
> > *To:* Khachatryan Roman 
> > *Cc:* Flink User Mail List 
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Motivation is to have k8s HA setup without extra component - Zookeeper,
> > see [1]
> >
> > Purpose of BlobStore is vague to me, what kind of BLOBs are stored? Looks
> > like  if we start job from savepoint, then persistence of BlobStore is
> > not necessary, but is it needed if we recover from checkpoint?
> >
> > Thanks,
> > Alexey
> >
> > [1]. https://issues.apache.org/jira/browse/FLINK-17598
> >
> >
> > --
> > *From:* Khachatryan Roman 
> > *Sent:* Friday, August 28, 2020 9:24 AM
> > *To:* Alexey Trenikhun 
> > *Cc:* Flink User Mail List 
> > *Subject:* Re: FileSystemHaServices and BlobStore
> >
> > Hello Alexey,
> >
> > I think you need FileSystemBlobStore as you are implementing HA Services,
> > and BLOBs should be highly available too.
> > However, I'm a bit concerned about the direction in general: it
> > essentially means re-implementing ZK functionality on top of FS.
> > What are the motivation and the use case?
> >
> > Regards,
> > Roman
> >
> >
> > On Fri, Aug 28, 2020 at 5:15 PM Alexey Trenikhun  wrote:
> >
> > Hello,
> > I'm thinking about implementing FileSystemHaServices - single leader, but
> > pe

Re: Fail to deploy Flink on minikube

2020-09-02 Thread Yang Wang
I guess something is wrong with your kube proxy, which causes TaskManager
could not connect to JobManager.
You could verify this by directly using JobManager Pod ip instead of
service name.

Please do as follows.
* Edit the TaskManager deployment(via kubectl edit flink-taskmanager) and
update the args field to the following.
   args: ["taskmanager", "-Djobmanager.rpc.address=172.18.0.5"]Given
that "172.18.0.5" is the JobManager pod ip.
* Delete the current TaskManager pod and let restart again
* Now check the TaskManager logs to check whether it could register
successfully



Best,
Yang

superainbower  于2020年9月3日周四 上午9:35写道:

> Hi Till,
> I find something may be helpful.
> The kubernetes Dashboard show job-manager ip 172.18.0.5, task-manager ip
> 172.18.0.6
> When I run command 'kubectl exec -ti flink-taskmanager-74c68c6f48-jqpbn --
> /bin/bash’ && ‘ping 172.18.0.5’
> I can get response
> But when I ping flink-jobmanager ,there is no response
>
> superainbower
> superainbo...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 09/3/2020 09:03,superainbower
>  wrote:
>
> Hi Till,
> This is the taskManager log
> As you see, the logs print  ‘line 92 -- Could not connect to
> flink-jobmanager:6123’
> then print ‘line 128 --Could not resolve ResourceManager address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*,
> retrying in 1 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.’   And
> repeat print this
>
> A few minutes later, the taskmanger shut down and restart
>
> This is my yaml files, could u help me to confirm did I omitted something?
> Thanks a lot!
> ---
> flink-configuration-configmap.yaml
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: flink-config
>   labels:
> app: flink
> data:
>   flink-conf.yaml: |+
> jobmanager.rpc.address: flink-jobmanager
> taskmanager.numberOfTaskSlots: 1
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> queryable-state.proxy.ports: 6125
> jobmanager.memory.process.size: 1024m
> taskmanager.memory.process.size: 1024m
> parallelism.default: 1
>   log4j-console.properties: |+
> rootLogger.level = INFO
> rootLogger.appenderRef.console.ref = ConsoleAppender
> rootLogger.appenderRef.rolling.ref = RollingFileAppender
> logger.akka.name = akka
> logger.akka.level = INFO
> logger.kafka.name= org.apache.kafka
> logger.kafka.level = INFO
> logger.hadoop.name = org.apache.hadoop
> logger.hadoop.level = INFO
> logger.zookeeper.name = org.apache.zookeeper
> logger.zookeeper.level = INFO
> appender.console.name = ConsoleAppender
> appender.console.type = CONSOLE
> appender.console.layout.type = PatternLayout
> appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
> appender.rolling.name = RollingFileAppender
> appender.rolling.type = RollingFile
> appender.rolling.append = false
> appender.rolling.fileName = ${sys:log.file}
> appender.rolling.filePattern = ${sys:log.file}.%i
> appender.rolling.layout.type = PatternLayout
> appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p
> %-60c %x - %m%n
> appender.rolling.policies.type = Policies
> appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
> appender.rolling.policies.size.size=100MB
> appender.rolling.strategy.type = DefaultRolloverStrategy
> appender.rolling.strategy.max = 10
> logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
> logger.netty.level = OFF
> ---
> jobmanager-service.yaml
> apiVersion: v1
> kind: Service
> metadata:
>   name: flink-jobmanager
> spec:
>   type: ClusterIP
>   ports:
>   - name: rpc
> port: 6123
>   - name: blob-server
> port: 6124
>   - name: webui
> port: 8081
>   selector:
> app: flink
> component: jobmanager
> --
> jobmanager-session-deployment.yaml
> apiVersion: apps/v1
> kind: Deployment
> metadata:
>   name: flink-jobmanager
> spec:
>   replicas: 1
>   selector:
> matchLabels:
>   app: flink
>   component: jobmanager
>   template:
> metadata:
>   labels:
> app: flink
> component: jobmanager
> spec:
>   containers:
>   - name: jobmanager
> image:
> registry.cn-hangzhou.aliyuncs.com/superainbower/flink:1.11.1
> args: ["jobmanager"]
> ports:
> - containerPort: 6123
>   name: rpc
> - 

Re: Use of slot sharing groups causing workflow to hang

2020-09-02 Thread Yangze Guo
Hi,

The failure of requesting slots usually because of the lack of
resources. If you put part of the workflow to a specific slot sharing
group, it may require more slots to run the workflow than before.
Could you share logs of the ResourceManager and SlotManager, I think
there are more clues in it.

Best,
Yangze Guo

On Thu, Sep 3, 2020 at 4:39 AM Ken Krugler  wrote:
>
> Hi all,
>
> I’ve got a streaming workflow (using Flink 1.11.1) that runs fine locally 
> (via Eclipse), with a parallelism of either 3 or 6.
>
> If I set up part of the workflow to use a specific (not “default”) slot 
> sharing group with a parallelism of 3, and the remaining portions of the 
> workflow have a parallelism of either 1 or 2, then the workflow never starts 
> running, and eventually fails due to a slot request not being fulfilled in 
> time.
>
> So I’m wondering how best to debug this.
>
> I don’t see any information (even at DEBUG level) being logged about which 
> operators are in what slot sharing group, or which slots are assigned to what 
> groups.
>
> Thanks,
>
> — Ken
>
> PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712, and 
> tried the approach of setting # of slots in the config, but that didn’t 
> change anything. I see that issue is still open, so wondering what Til and 
> Konstantin have to say about it.
>
> --
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>


Re: Fail to deploy Flink on minikube

2020-09-02 Thread superainbower
Hi Till,
I find something may be helpful.
The kubernetes Dashboard show job-manager ip 172.18.0.5, task-manager ip 
172.18.0.6
When I run command 'kubectl exec -ti flink-taskmanager-74c68c6f48-jqpbn -- 
/bin/bash’ && ‘ping 172.18.0.5’ 
I can get response
But when I ping flink-jobmanager ,there is no response


| |
superainbower
|
|
superainbo...@163.com
|
签名由网易邮箱大师定制


On 09/3/2020 09:03,superainbower wrote:
Hi Till,
This is the taskManager log
As you see, the logs print  ‘line 92 -- Could not connect to 
flink-jobmanager:6123’
then print ‘line 128 --Could not resolve ResourceManager address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.’   And 
repeat print this


A few minutes later, the taskmanger shut down and restart


This is my yaml files, could u help me to confirm did I omitted something? 
Thanks a lot!
---
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
app: flink
data:
  flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 1024m
parallelism.default: 1
  log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
---
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
port: 6123
  - name: blob-server
port: 6124
  - name: webui
port: 8081
  selector:
app: flink
component: jobmanager
--
jobmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
matchLabels:
  app: flink
  component: jobmanager
  template:
metadata:
  labels:
app: flink
component: jobmanager
spec:
  containers:
  - name: jobmanager
image: registry.cn-hangzhou.aliyuncs.com/superainbower/flink:1.11.1
args: ["jobmanager"]
ports:
- containerPort: 6123
  name: rpc
- containerPort: 6124
  name: blob-server
- containerPort: 8081
  name: webui
livenessProbe:
  tcpSocket:
port: 6123
  initialDelaySeconds: 30
  periodSeconds: 60
volumeMounts:
- name: flink-config-volume
  mountPath: /opt/flink/conf
securityContext:
  runAsUser:   # refers to user _flink_ from official flink image, 
change if necessary
  volumes:
  - name: flink-config-volume
configMap:
  name: flink-config
  items:
  - key: flink-conf.yaml
path: flink-conf.yaml
  - key: log4j-console.properties
path: log4j-console.properties
  imagePullSecrets:
- name: regcred
---
taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 1
  selector:
matchLabels:
  app: flink
  component: taskmanager
  template:
metadata:
  labels:
app: flink
component: taskmanager
spec:
  containers:
  - name: taskmanager
  

Re: Fail to deploy Flink on minikube

2020-09-02 Thread superainbower
Hi Till,
This is the taskManager log
As you see, the logs print  ‘line 92 -- Could not connect to 
flink-jobmanager:6123’
then print ‘line 128 --Could not resolve ResourceManager address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.’   And 
repeat print this


A few minutes later, the taskmanger shut down and restart


This is my yaml files, could u help me to confirm did I omitted something? 
Thanks a lot!
---
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
app: flink
data:
  flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
queryable-state.proxy.ports: 6125
jobmanager.memory.process.size: 1024m
taskmanager.memory.process.size: 1024m
parallelism.default: 1
  log4j-console.properties: |+
rootLogger.level = INFO
rootLogger.appenderRef.console.ref = ConsoleAppender
rootLogger.appenderRef.rolling.ref = RollingFileAppender
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
appender.console.name = ConsoleAppender
appender.console.type = CONSOLE
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10
logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
---
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
port: 6123
  - name: blob-server
port: 6124
  - name: webui
port: 8081
  selector:
app: flink
component: jobmanager
--
jobmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
matchLabels:
  app: flink
  component: jobmanager
  template:
metadata:
  labels:
app: flink
component: jobmanager
spec:
  containers:
  - name: jobmanager
image: registry.cn-hangzhou.aliyuncs.com/superainbower/flink:1.11.1
args: ["jobmanager"]
ports:
- containerPort: 6123
  name: rpc
- containerPort: 6124
  name: blob-server
- containerPort: 8081
  name: webui
livenessProbe:
  tcpSocket:
port: 6123
  initialDelaySeconds: 30
  periodSeconds: 60
volumeMounts:
- name: flink-config-volume
  mountPath: /opt/flink/conf
securityContext:
  runAsUser:   # refers to user _flink_ from official flink image, 
change if necessary
  volumes:
  - name: flink-config-volume
configMap:
  name: flink-config
  items:
  - key: flink-conf.yaml
path: flink-conf.yaml
  - key: log4j-console.properties
path: log4j-console.properties
  imagePullSecrets:
- name: regcred
---
taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 1
  selector:
matchLabels:
  app: flink
  component: taskmanager
  template:
metadata:
  labels:
app: flink
component: taskmanager
spec:
  containers:
  - name: taskmanager
image: registry.cn-hangzhou.aliyuncs.com/superainbower/flink:1.11.1
args: ["taskmanager"]
ports:
- containerPort: 6122
  name: rpc
- containerPort: 6125
  name: query-state
livenessProbe:
  tcpSocket:
port: 6122
  initialDelaySeconds: 30
  periodSeconds: 60
volumeMounts:
- name: flink-config-volum

Use of slot sharing groups causing workflow to hang

2020-09-02 Thread Ken Krugler
Hi all,

I’ve got a streaming workflow (using Flink 1.11.1) that runs fine locally (via 
Eclipse), with a parallelism of either 3 or 6.

If I set up part of the workflow to use a specific (not “default”) slot sharing 
group with a parallelism of 3, and the remaining portions of the workflow have 
a parallelism of either 1 or 2, then the workflow never starts running, and 
eventually fails due to a slot request not being fulfilled in time.

So I’m wondering how best to debug this.

I don’t see any information (even at DEBUG level) being logged about which 
operators are in what slot sharing group, or which slots are assigned to what 
groups.

Thanks,

— Ken

PS - I’ve looked at https://issues.apache.org/jira/browse/FLINK-8712 
, and tried the approach of 
setting # of slots in the config, but that didn’t change anything. I see that 
issue is still open, so wondering what Til and Konstantin have to say about it.

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Re: Default Flink Metrics Graphite

2020-09-02 Thread Vijayendra Yadav
Hi Till,

*Info below, also I have a question at the end. *
pretty much what was told earlier, for 1.10.0 use: metrics.reporter.grph.class:
org.apache.flink.metrics.graphite.GraphiteReporter
And then Looks like my Graphite installation has a problem or I am missing
something. Just to make sure Flink exporter has no problem I set up
graphite exporter config to local host and port  and then i started
listening to port using netcat and I was able to see metrics:

vy0769@ubuntu:~/IdeaProjects/vdcs-kafka-flink-ingestion$* nc -lv -p *
Listening on [0.0.0.0] (family 0, port )



*localhost.jobmanager.Socket-Window-WordCount.downtime 0
1599063945localhost.jobmanager.Socket-Window-WordCount.fullRestarts 0
1599063945localhost.jobmanager.Socket-Window-WordCount.lastCheckpointAlignmentBuffered
-1 1599063945*

*I have a Question above:  There two values say:  0 1599063945.   Is  value
1599063945 the Timestamp ?*

Regards,
Vijay

On Wed, Sep 2, 2020 at 2:06 AM Till Rohrmann  wrote:

> Hi Vijayendra,
>
> what was the problem in the end? Sharing this knowledge might help other
> community member who might run into the same issue.
>
> Cheers,
> Till
>
> On Wed, Sep 2, 2020 at 3:34 AM Vijayendra Yadav 
> wrote:
>
>> Thanks all, I could see the metrics.
>>
>> On Thu, Aug 27, 2020 at 7:51 AM Robert Metzger 
>> wrote:
>>
>>> I don't think these error messages give us a hint why you can't see the
>>> metrics (because they are about registering metrics, not reporting them)
>>>
>>> Are you sure you are using the right configuration parameters for Flink
>>> 1.10? That all required JARs are in the lib/ folder (on all machines) and
>>> that your graphite setup is working (have you confirmed that you can show
>>> any metrics in the Graphite UI (maybe from a Graphite demo thingy))?
>>>
>>>
>>> On Thu, Aug 27, 2020 at 2:05 AM Vijayendra Yadav 
>>> wrote:
>>>
 Hi Chesnay and Dawid,

 I see multiple entries as following in Log:

 2020-08-26 23:46:19,105 WARN
 org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while
 registering metric: numRecordsIn.
 java.lang.IllegalArgumentException: A metric named
 ip-99--99-99.taskmanager.container_1596056409708_1570_01_06.vdcs-kafka-flink-test.Map.0.numRecordsIn
 already exists
 at
 com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
 2020-08-26 23:46:19,094 WARN
 org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while
 registering metric: numRecordsOut.
 java.lang.IllegalArgumentException: A metric named
 ip-99--99-999.taskmanager.container_1596056409708_1570_01_05.vdcs-kafka-flink-test.Map.2.numRecordsOut
 already exists
 at
 com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
 at
 org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
 at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
 at
 org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
 at
 org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:343)
 at
 org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:426)
 at
 org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:359)
 at
 org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:349)
 at
 org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:41)
 at
 org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:48)
 at
 org.apache.flink.runtime.metrics.groups.TaskMetricGroup.lambda$getOrAddOperator$0(TaskMetricGroup.java:154)
 at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
 at
 org.apache.flink.runtime.metrics.groups.TaskMetricGroup.getOrAddOperator(TaskMetricGroup.java:154)
 at
 org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:180)
 at
 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
 at
 org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:75)
 at
 org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:429)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
 at
 org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
 at
 org.apache.flink.st

Re: Task Chaining slots performance

2020-09-02 Thread Vijayendra Yadav
Thanks for the information Till

Regards,
Vijay

> 
> On Sep 2, 2020, at 2:21 AM, Till Rohrmann  wrote:
> 
> 
> Hi Vijayendra,
> 
> in the general case, I believe that chaining will almost always give you 
> better performance since you consume fewer resources, avoid context switches 
> between threads and if object reuse is enabled even avoid serialization when 
> records are passed from one operator to another.
> 
> The only scenario I can think of where disabling chaining might be 
> beneficial, is when you have a pipeline of operator where each operator 
> performs a blocking operation (e.g. interacts with some external systems). If 
> these operators are chained, then the processing time of a single record 
> would be n * time of blocking operation. If you disabled chaining in this 
> scenario, then these waiting times could overlap between different records 
> (the first operator can already start processing the next record, while the 
> second operator waits for the external operation to finish for the first 
> record). That way, once the pipeline is fully filled, the processing time of 
> a single record would be time of the longest blocking operation.
> 
> Cheers,
> Till
> 
>> On Wed, Sep 2, 2020 at 2:54 AM Vijayendra Yadav  
>> wrote:
>> Hi Team,
>> 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
>>  
>> 
>> 
>> Flink chaining my Tasks which is like: stream.map().filter().map() 
>> 
>> I think here the entire chain runs in the same slot.
>> 
>> Documentation says flink does chahining for better performance, but are 
>> there any scenarios we should disable or start a new chain mainly for the 
>> purpose of better performance ?
>> 
>> StreamExecutionEnvironment.disableOperatorChaining()
>> 
>> someStream.filter(...).map(...).startNewChain().map(...)
>> someStream.map(...).disableChaining()


Re: PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-02 Thread Xingbo Huang
Hi Manas,

As Till said, you need to check whether the execution environment used is
LocalStreamEnvironment. You need to get the class object corresponding to
the corresponding java object through py4j. You can take a look at the
example I wrote below, I hope it will help you

```
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
from py4j.java_gateway import get_java_class


def test():
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(
env, environment_settings=EnvironmentSettings.new_instance()
.in_streaming_mode().use_blink_planner().build())
gateway = get_gateway()

# get the execution environment class
env_class = table_env._j_tenv.getPlanner().getExecEnv().getClass()

# get the LocalStreamEnvironment class
local_stream_environment_class = get_java_class(

gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment)
print(env_class == local_stream_environment_class)


if __name__ == '__main__':
test()

```


Best,
Xingbo

Till Rohrmann  于2020年9月2日周三 下午5:03写道:

> Hi Manas,
>
> I am not entirely sure but you might try to check whether
> env._j_stream_execution_environment is an instance of
> gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment
> via Python's isinstance function.
>
> Cheers,
> Till
>
> On Wed, Sep 2, 2020 at 5:46 AM Manas Kale  wrote:
>
>> Hi Xingbo,
>> Thank you for clarifying that. I am indeed maintaining a different
>> version of the code by commenting those lines, but I was just wondering if
>> it was possible to detect the environment programmatically.
>>
>> Regards,
>> Manas
>>
>> On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang  wrote:
>>
>>> Hi Manas,
>>>
>>> When running locally, you need
>>> `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to
>>> wait job finished. However, when you submit to the cluster, you need to
>>> delete this code. In my opinion, the current feasible solution is that you
>>> prepare two sets of codes for this, although this is annoying. After all,
>>> running jobs locally is usually for testing, so it should be acceptable to
>>> prepare different codes.
>>> In the long run, it should be the flink framework that makes different
>>> behaviors according to different environments  so that users don’t need to
>>> prepare different codes.
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale  于2020年9月1日周二 下午3:00写道:
>>>
 Hi,
 I am trying to submit a pyFlink job in detached mode using the command:

 ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j
 flink-sql-connector-kafka_2.11-1.11.0.jar

 The jobs are submitted successfully but the command does not return. I
 realized that was because I had the following line in
 basic_streaming_job.py:

 ten_sec_summaries.get_job_client().get_job_execution_result().result()

 This statement is useful when testing this locally within a minicluster
 (using python basic_streaming_job.py) but not needed when the job is
 submitted to a cluster.

 So I would like to programmatically detect if the
 StreamExecutionEnvironment is a localStreamEnvironment and execute the
 above snippet accordingly. How do I do this?


 Thanks,
 Manas

>>>


Re: A couple of question for Stateful Functions

2020-09-02 Thread Dan Pettersson
Thanks for your quick reply.

/Dan

Den ons 2 sep. 2020 kl 12:24 skrev Igal Shilman :

> Hi Dan, let me try to answer your questions:
>
>
>> I guess my question is if one can
>> freely mix Flink core with SF's code with regards to performance,
>> fault-tolerance, and checkpointing?
>
>
> The main limitations at the moment is that, currently SF requires a
> processing time watermark semantics only, event time is not supported as it
> is difficult to reason about completion in the presence of loops.
> Other than that in respect to fault-tolerance and checkpointing StateFun
> is built on top of the DataStream API, so the same gurantines applies to SF
> as-well.
>
>
>> I'm unable to use Protobuf so POJO's is going to be processed. Is there a
>> large performance impact of using Pojos instead of protos in SF?
>
>
> The only supported message data type for SF is Protocol Buffers and I
> would highly recommend to use that,
> One option is to transform your Pojo into a Protobuf just before entering
> SF, and you can convert it back to your original Pojo when exiting from SF.
> If you absolutely have to use something else, you can fall back to kryo[1]
> or provide your own[2] but then schema evaluation of your messages and
> state is not guaranteed anymore, and you
> lose the ability to communicate with remote functions.
>
> Would the appendingBuffer be
>> de-/serialized for each function invocation?
>
>
> The appending buffer supports efficient appends (the buffer is *not*
> deserialized on every function invocation)
> In fact, it is backed by Flink's ListState[3]
>
> [1] -
> https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L62
> [2] -
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java
> [3] -
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-keyed-state
>
> Thanks,
> Igal
>
> On Wed, Sep 2, 2020 at 10:51 AM Till Rohrmann 
> wrote:
>
>> Hi Dan,
>>
>> thanks for reaching out to the Flink community. I'm pulling in Gordon and
>> Igal who will be able to answer your questions.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 2, 2020 at 8:22 AM danp  wrote:
>>
>>> Hi,
>>>
>>> Nice to see the progress of Stateful functions.
>>>
>>> I have a few questions that I hope you can reply to.
>>>
>>> My first question is regarding the newly implemented
>>> StatefulFunctionDataStreamBuilder.
>>> Is there anything to pay attention to if one first union a couple of
>>> streams
>>> and performs a sort via a keyBy and a KeyedProcessFunction before
>>> dispatching the messages to via RoutableMessage?
>>> In this sorting I'm using a mapstate and I guess my question is if one
>>> can
>>> freely mix Flink core with SF's code with regards to performance,
>>> fault-tolerance, and checkpointing?
>>>
>>> I'm unable to use Protobuf so POJO's is going to be processed. Is there a
>>> large performance impact of using Pojos instead of protos in SF?
>>>
>>> Also I wonder if there's going to be a severe performance penalty if I
>>> had a
>>> function that was called very often lets say 1000 a second and hold a
>>> PersistentAppendingBuffer with those objects appended for each message?
>>> Then
>>> when the 1001:st message comes or a timetrigger kicks in, I would write
>>> everything to disk and delete the state. Would the appendingBuffer be
>>> de-/serialized for each function invocation?
>>>
>>> If yes, is there any workaround for this so the data is just held in RAM?
>>>
>>> Thanks,
>>>
>>> Regards
>>> Dan
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Job Manager taking long time to upload job graph on remote storage

2020-09-02 Thread Till Rohrmann
The logs don't look suspicious. Could you maybe check what the write
bandwidth to your GCS bucket is from the machine you are running Flink on?
It should be enough to generate a 200 MB file and write it to GCS. Thanks a
lot for your help in debugging this matter.

Cheers,
Till

On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur  wrote:

> Hi,
>
> Thanks for the response. Yes, we are running Flink in HA mode. We checked
> there are no such quota limits for GCS for us. Please find the logs below,
> here you can see the copying of blob started at 11:50:39,455 and it
> got JobGraph submission at 11:50:46,400.
>
> 2020-09-01 11:50:37,061 DEBUG
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
> the idle timeout.
> 2020-09-01 11:50:37,061 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
> 2020-09-01 11:50:37,062 DEBUG
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
> the idle timeout.
> 2020-09-01 11:50:37,062 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
> 2020-09-01 11:50:37,305 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:37,305 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:37,354 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
> 2020-09-01 11:50:37,354 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
> 2020-09-01 11:50:39,455 DEBUG
> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
> from
> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
> to
> gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
> 2020-09-01 11:50:43,904 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x30be3d929102460 after 2ms
> 2020-09-01 11:50:46,400 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
> 2020-09-01 11:50:46,403 INFO
>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
> 2020-09-01 11:50:46,405 DEBUG
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
> 2020-09-01 11:50:47,325 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:47,325 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:47,325 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:47,325 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Trigger heartbeat request.
> 2020-09-01 11:50:47,330 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
> 2020-09-01 11:50:47,331 DEBUG
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
> 2020-09-01 11:50:52,880 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> notification sessionid:0x30be3d929102460
> 2020-09-01 11:50:52,880 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> WatchedEvent state:SyncConnected type:NodeChildrenChanged
> path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
> 2020-09-01 11:50:52,882 INFO
>  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
> Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.
>
> Thank You.
>
> On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann  wrote:
>
>> Hi Prakhar,
>>
>> have you enabled HA for your cluster? If yes, then Flink will try to
>> store the job graph to the configured high-availability.storageDir in order
>> to be able to recover it. If this operation takes long, then it is either
>> the filesystem which is slow or storing the pointer in ZooKeeper. If it is
>> the filesystem, then I would suggest to check whether you have some
>> r

Re: Fail to deploy Flink on minikube

2020-09-02 Thread Till Rohrmann
Hmm, this is indeed strange. Could you share the logs of the TaskManager
with us? Ideally you set the log level to debug. Thanks a lot.

Cheers,
Till

On Wed, Sep 2, 2020 at 12:45 PM art  wrote:

> Hi Till,
>
> The full information when I run command ' kubectl get all’  like this:
>
> NAME READY   STATUSRESTARTS   AGE
> pod/flink-jobmanager-85bdbd98d8-ppjmf1/1 Running   0  2m34s
> pod/flink-taskmanager-74c68c6f48-6jb5v   1/1 Running   0  2m34s
>
> NAME   TYPECLUSTER-IP  EXTERNAL-IP
> PORT(S)  AGE
> service/flink-jobmanager   ClusterIP   10.103.207.75   
>  6123/TCP,6124/TCP,8081/TCP   2m34s
> service/kubernetes ClusterIP   10.96.0.1   
>  443/TCP  5d2h
>
> NAMEREADY   UP-TO-DATE   AVAILABLE   AGE
> deployment.apps/flink-jobmanager1/1 11   2m34s
> deployment.apps/flink-taskmanager   1/1 11   2m34s
>
> NAME   DESIRED   CURRENT   READY
> AGE
> replicaset.apps/flink-jobmanager-85bdbd98d81 1 1
> 2m34s
> replicaset.apps/flink-taskmanager-74c68c6f48   1 1 1
> 2m34s
>
> And I can open flink ui but the task manger is 0 ,so the job manger is
> work well
> I think the problem is taksmanger can not register itself to jobmanger,
>  did I miss some configure?
>
>
> 在 2020年9月2日,下午5:24,Till Rohrmann  写道:
>
> Hi art,
>
> could you check what `kubectl get services` returns? Usually if you run
> `kubectl get all` you should also see the services. But in your case there
> are no services listed. You have see something like
> service/flink-jobmanager otherwise the flink-jobmanager service (K8s
> service) is not running.
>
> Cheers,
> Till
>
> On Wed, Sep 2, 2020 at 11:15 AM art  wrote:
>
>> Hi Till,
>>
>> I’m sure the job manager-service is started, I can find it in Kubernetes
>> DashBoard
>>
>> When I run command ' kubectl get deployment’ I can got this:
>> flink-jobmanager1/1 11   33s
>> flink-taskmanager   1/1 11   33s
>>
>> When I run command ' kubectl get all’ I can got this:
>> NAME READY   STATUSRESTARTS   AGE
>> pod/flink-jobmanager-85bdbd98d8-ppjmf1/1 Running   0
>>  2m34s
>> pod/flink-taskmanager-74c68c6f48-6jb5v   1/1 Running   0
>>  2m34s
>>
>> So, I think flink-jobmanager works well, but taskmannger is restarted
>> every few minutes
>>
>> My minikube version: v1.12.3
>> Flink version:v1.11.1
>>
>> 在 2020年9月2日,下午4:27,Till Rohrmann  写道:
>>
>> Hi art,
>>
>> could you verify that the jobmanager-service has been started? It looks
>> as if the name flink-jobmanager is not resolvable. It could also help to
>> know the Minikube and K8s version you are using.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 2, 2020 at 9:50 AM art  wrote:
>>
>>> Hi,I’m going to deploy flink on minikube referring to
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/kubernetes.html
>>> ;
>>> kubectl create -f flink-configuration-configmap.yaml
>>> kubectl create -f jobmanager-service.yaml
>>> kubectl create -f jobmanager-session-deployment.yaml
>>> kubectl create -f taskmanager-session-deployment.yaml
>>>
>>> But I got this
>>>
>>> 2020-09-02 06:45:42,664 WARN  akka.remote.ReliableDeliverySupervisor
>>>   [] - Association with remote system [
>>> akka.tcp://flink@flink-jobmanager:6123] has failed, address is now
>>> gated for [50] ms. Reason: [Association failed with [
>>> akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>>> [java.net.UnknownHostException: flink-jobmanager: Temporary failure in name
>>> resolution]
>>> 2020-09-02 06:45:42,691 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
>>> not resolve ResourceManager address
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*,
>>> retrying in 1 ms: Could not connect to rpc endpoint under address
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
>>> 2020-09-02 06:46:02,731 INFO
>>>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
>>> not resolve ResourceManager address
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*,
>>> retrying in 1 ms: Could not connect to rpc endpoint under address
>>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
>>> 2020-09-02 06:46:12,731 INFO  akka.remote.transport.ProtocolStateActor
>>>   [] - No response from remote for outbound association.
>>> Associate timed out after [2 ms].
>>>
>>> And when I run the command 'kubectl exec -ti
>>> flink-taskmanager-74c68c6f48-9tkvd -- /bin/bash’ && ‘ping flink-jobmanager’
>>> , I find I cannot ping flink-jobmanager from taskmanager
>>>
>>> I am new to k8s, can anyone give me some tutorial? Thanks a lot !
>>>
>>

Re: Job Manager taking long time to upload job graph on remote storage

2020-09-02 Thread Prakhar Mathur
Hi,

Thanks for the response. Yes, we are running Flink in HA mode. We checked
there are no such quota limits for GCS for us. Please find the logs below,
here you can see the copying of blob started at 11:50:39,455 and it
got JobGraph submission at 11:50:46,400.

2020-09-01 11:50:37,061 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
the idle timeout.
2020-09-01 11:50:37,061 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
2020-09-01 11:50:37,062 DEBUG
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
the idle timeout.
2020-09-01 11:50:37,062 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
2020-09-01 11:50:37,305 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:37,305 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:37,354 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:37,354 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:39,455 DEBUG
org.apache.flink.runtime.blob.FileSystemBlobStore - Copying
from
/tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
to
gs:///blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
2020-09-01 11:50:43,904 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x30be3d929102460 after 2ms
2020-09-01 11:50:46,400 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,403 INFO
 org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Submitting
job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
2020-09-01 11:50:46,405 DEBUG
org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
2020-09-01 11:50:47,325 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:47,325 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Trigger heartbeat request.
2020-09-01 11:50:47,330 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
2020-09-01 11:50:47,331 DEBUG
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
2020-09-01 11:50:52,880 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
notification sessionid:0x30be3d929102460
2020-09-01 11:50:52,880 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
WatchedEvent state:SyncConnected type:NodeChildrenChanged
path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
2020-09-01 11:50:52,882 INFO
 org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.

Thank You.

On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann  wrote:

> Hi Prakhar,
>
> have you enabled HA for your cluster? If yes, then Flink will try to store
> the job graph to the configured high-availability.storageDir in order to be
> able to recover it. If this operation takes long, then it is either the
> filesystem which is slow or storing the pointer in ZooKeeper. If it is the
> filesystem, then I would suggest to check whether you have some read/write
> quotas which might slow the operation down.
>
> If you haven't enabled HA or persisting the jobGraph is not what takes
> long, then the next most likely candidate is the recovery from a previous
> checkpoint. Here again, Flink needs to read from the remote storage (in
> your case GCS). Depending on the size of the checkpoint and the read
> bandwidth, this can be faster or slower. The best way to figure out what
> takes long is to share the logs with us so that we can confirm what takes
> long

Re: Fail to deploy Flink on minikube

2020-09-02 Thread art
Hi Till,
  
The full information when I run command ' kubectl get all’  like this:

NAME READY   STATUSRESTARTS   AGE
pod/flink-jobmanager-85bdbd98d8-ppjmf1/1 Running   0  2m34s
pod/flink-taskmanager-74c68c6f48-6jb5v   1/1 Running   0  2m34s

NAME   TYPECLUSTER-IP  EXTERNAL-IP   PORT(S)
  AGE
service/flink-jobmanager   ClusterIP   10.103.207.75   
6123/TCP,6124/TCP,8081/TCP   2m34s
service/kubernetes ClusterIP   10.96.0.1   443/TCP
  5d2h

NAMEREADY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/flink-jobmanager1/1 11   2m34s
deployment.apps/flink-taskmanager   1/1 11   2m34s

NAME   DESIRED   CURRENT   READY   AGE
replicaset.apps/flink-jobmanager-85bdbd98d81 1 1   2m34s
replicaset.apps/flink-taskmanager-74c68c6f48   1 1 1   2m34s

And I can open flink ui but the task manger is 0 ,so the job manger is work well
I think the problem is taksmanger can not register itself to jobmanger,  did I 
miss some configure?


> 在 2020年9月2日,下午5:24,Till Rohrmann  写道:
> 
> Hi art,
> 
> could you check what `kubectl get services` returns? Usually if you run 
> `kubectl get all` you should also see the services. But in your case there 
> are no services listed. You have see something like service/flink-jobmanager 
> otherwise the flink-jobmanager service (K8s service) is not running.
> 
> Cheers,
> Till
> 
> On Wed, Sep 2, 2020 at 11:15 AM art  > wrote:
> Hi Till,
> 
> I’m sure the job manager-service is started, I can find it in Kubernetes 
> DashBoard
> 
> When I run command ' kubectl get deployment’ I can got this:
> flink-jobmanager1/1 11   33s
> flink-taskmanager   1/1 11   33s
> 
> When I run command ' kubectl get all’ I can got this:
> NAME READY   STATUSRESTARTS   AGE
> pod/flink-jobmanager-85bdbd98d8-ppjmf1/1 Running   0  2m34s
> pod/flink-taskmanager-74c68c6f48-6jb5v   1/1 Running   0  2m34s
> 
> So, I think flink-jobmanager works well, but taskmannger is restarted every 
> few minutes 
> 
> My minikube version: v1.12.3
> Flink version:v1.11.1
> 
>> 在 2020年9月2日,下午4:27,Till Rohrmann > > 写道:
>> 
>> Hi art,
>> 
>> could you verify that the jobmanager-service has been started? It looks as 
>> if the name flink-jobmanager is not resolvable. It could also help to know 
>> the Minikube and K8s version you are using.
>> 
>> Cheers,
>> Till
>> 
>> On Wed, Sep 2, 2020 at 9:50 AM art > > wrote:
>> Hi,I’m going to deploy flink on minikube referring to 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/kubernetes.html
>>  
>> ;
>> kubectl create -f flink-configuration-configmap.yaml
>> kubectl create -f jobmanager-service.yaml
>> kubectl create -f jobmanager-session-deployment.yaml
>> kubectl create -f taskmanager-session-deployment.yaml
>> 
>> But I got this
>> 
>> 2020-09-02 06:45:42,664 WARN  akka.remote.ReliableDeliverySupervisor 
>>   [] - Association with remote system 
>> [akka.tcp://flink@flink-jobmanager:6123 <>] has failed, address is now gated 
>> for [50] ms. Reason: [Association failed with 
>> [akka.tcp://flink@flink-jobmanager:6123 <>]] Caused by: 
>> [java.net.UnknownHostException: flink-jobmanager: Temporary failure in name 
>> resolution]
>> 2020-09-02 06:45:42,691 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could not 
>> resolve ResourceManager address 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_* <>, 
>> retrying in 1 ms: Could not connect to rpc endpoint under address 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_* <>.
>> 2020-09-02 06:46:02,731 INFO  
>> org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could not 
>> resolve ResourceManager address 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_* <>, 
>> retrying in 1 ms: Could not connect to rpc endpoint under address 
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_* <>.
>> 2020-09-02 06:46:12,731 INFO  akka.remote.transport.ProtocolStateActor   
>>   [] - No response from remote for outbound association. 
>> Associate timed out after [2 ms]. 
>> 
>> And when I run the command 'kubectl exec -ti 
>> flink-taskmanager-74c68c6f48-9tkvd -- /bin/bash’ && ‘ping flink-jobmanager’ 
>> , I find I cannot ping flink-jobmanager from taskmanager
>> 
>> I am new to k8s, can anyone give me some tutorial? Thanks a lot !
> 



Re: A couple of question for Stateful Functions

2020-09-02 Thread Igal Shilman
Hi Dan, let me try to answer your questions:


> I guess my question is if one can
> freely mix Flink core with SF's code with regards to performance,
> fault-tolerance, and checkpointing?


The main limitations at the moment is that, currently SF requires a
processing time watermark semantics only, event time is not supported as it
is difficult to reason about completion in the presence of loops.
Other than that in respect to fault-tolerance and checkpointing StateFun is
built on top of the DataStream API, so the same gurantines applies to SF
as-well.


> I'm unable to use Protobuf so POJO's is going to be processed. Is there a
> large performance impact of using Pojos instead of protos in SF?


The only supported message data type for SF is Protocol Buffers and I would
highly recommend to use that,
One option is to transform your Pojo into a Protobuf just before entering
SF, and you can convert it back to your original Pojo when exiting from SF.
If you absolutely have to use something else, you can fall back to kryo[1]
or provide your own[2] but then schema evaluation of your messages and
state is not guaranteed anymore, and you
lose the ability to communicate with remote functions.

Would the appendingBuffer be
> de-/serialized for each function invocation?


The appending buffer supports efficient appends (the buffer is *not*
deserialized on every function invocation)
In fact, it is backed by Flink's ListState[3]

[1] -
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-flink-datastream-example/src/main/java/org/apache/flink/statefun/examples/datastream/Example.java#L62
[2] -
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/message/MessagePayloadSerializer.java
[3] -
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-keyed-state

Thanks,
Igal

On Wed, Sep 2, 2020 at 10:51 AM Till Rohrmann  wrote:

> Hi Dan,
>
> thanks for reaching out to the Flink community. I'm pulling in Gordon and
> Igal who will be able to answer your questions.
>
> Cheers,
> Till
>
> On Wed, Sep 2, 2020 at 8:22 AM danp  wrote:
>
>> Hi,
>>
>> Nice to see the progress of Stateful functions.
>>
>> I have a few questions that I hope you can reply to.
>>
>> My first question is regarding the newly implemented
>> StatefulFunctionDataStreamBuilder.
>> Is there anything to pay attention to if one first union a couple of
>> streams
>> and performs a sort via a keyBy and a KeyedProcessFunction before
>> dispatching the messages to via RoutableMessage?
>> In this sorting I'm using a mapstate and I guess my question is if one can
>> freely mix Flink core with SF's code with regards to performance,
>> fault-tolerance, and checkpointing?
>>
>> I'm unable to use Protobuf so POJO's is going to be processed. Is there a
>> large performance impact of using Pojos instead of protos in SF?
>>
>> Also I wonder if there's going to be a severe performance penalty if I
>> had a
>> function that was called very often lets say 1000 a second and hold a
>> PersistentAppendingBuffer with those objects appended for each message?
>> Then
>> when the 1001:st message comes or a timetrigger kicks in, I would write
>> everything to disk and delete the state. Would the appendingBuffer be
>> de-/serialized for each function invocation?
>>
>> If yes, is there any workaround for this so the data is just held in RAM?
>>
>> Thanks,
>>
>> Regards
>> Dan
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Fail to deploy Flink on minikube

2020-09-02 Thread Till Rohrmann
Hi art,

could you check what `kubectl get services` returns? Usually if you run
`kubectl get all` you should also see the services. But in your case there
are no services listed. You have see something like
service/flink-jobmanager otherwise the flink-jobmanager service (K8s
service) is not running.

Cheers,
Till

On Wed, Sep 2, 2020 at 11:15 AM art  wrote:

> Hi Till,
>
> I’m sure the job manager-service is started, I can find it in Kubernetes
> DashBoard
>
> When I run command ' kubectl get deployment’ I can got this:
> flink-jobmanager1/1 11   33s
> flink-taskmanager   1/1 11   33s
>
> When I run command ' kubectl get all’ I can got this:
> NAME READY   STATUSRESTARTS   AGE
> pod/flink-jobmanager-85bdbd98d8-ppjmf1/1 Running   0  2m34s
> pod/flink-taskmanager-74c68c6f48-6jb5v   1/1 Running   0  2m34s
>
> So, I think flink-jobmanager works well, but taskmannger is restarted
> every few minutes
>
> My minikube version: v1.12.3
> Flink version:v1.11.1
>
> 在 2020年9月2日,下午4:27,Till Rohrmann  写道:
>
> Hi art,
>
> could you verify that the jobmanager-service has been started? It looks as
> if the name flink-jobmanager is not resolvable. It could also help to know
> the Minikube and K8s version you are using.
>
> Cheers,
> Till
>
> On Wed, Sep 2, 2020 at 9:50 AM art  wrote:
>
>> Hi,I’m going to deploy flink on minikube referring to
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/kubernetes.html
>> ;
>> kubectl create -f flink-configuration-configmap.yaml
>> kubectl create -f jobmanager-service.yaml
>> kubectl create -f jobmanager-session-deployment.yaml
>> kubectl create -f taskmanager-session-deployment.yaml
>>
>> But I got this
>>
>> 2020-09-02 06:45:42,664 WARN  akka.remote.ReliableDeliverySupervisor
>>   [] - Association with remote system [
>> akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated
>> for [50] ms. Reason: [Association failed with [
>> akka.tcp://flink@flink-jobmanager:6123]] Caused by:
>> [java.net.UnknownHostException: flink-jobmanager: Temporary failure in name
>> resolution]
>> 2020-09-02 06:45:42,691 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
>> not resolve ResourceManager address
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*,
>> retrying in 1 ms: Could not connect to rpc endpoint under address
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
>> 2020-09-02 06:46:02,731 INFO
>>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
>> not resolve ResourceManager address
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*,
>> retrying in 1 ms: Could not connect to rpc endpoint under address
>> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
>> 2020-09-02 06:46:12,731 INFO  akka.remote.transport.ProtocolStateActor
>>   [] - No response from remote for outbound association.
>> Associate timed out after [2 ms].
>>
>> And when I run the command 'kubectl exec -ti
>> flink-taskmanager-74c68c6f48-9tkvd -- /bin/bash’ && ‘ping flink-jobmanager’
>> , I find I cannot ping flink-jobmanager from taskmanager
>>
>> I am new to k8s, can anyone give me some tutorial? Thanks a lot !
>>
>
>


Re: Task Chaining slots performance

2020-09-02 Thread Till Rohrmann
Hi Vijayendra,

in the general case, I believe that chaining will almost always give you
better performance since you consume fewer resources, avoid context
switches between threads and if object reuse is enabled even avoid
serialization when records are passed from one operator to another.

The only scenario I can think of where disabling chaining might be
beneficial, is when you have a pipeline of operator where each operator
performs a blocking operation (e.g. interacts with some external systems).
If these operators are chained, then the processing time of a single record
would be n * time of blocking operation. If you disabled chaining in this
scenario, then these waiting times could overlap between different records
(the first operator can already start processing the next record, while the
second operator waits for the external operation to finish for the first
record). That way, once the pipeline is fully filled, the processing time
of a single record would be time of the longest blocking operation.

Cheers,
Till

On Wed, Sep 2, 2020 at 2:54 AM Vijayendra Yadav 
wrote:

> Hi Team,
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
>
>
>
> *Flink chaining my Tasks which is like: stream.map().filter().map() *
>
> *I think here the entire chain runs in the same slot.*
>
> *Documentation says flink does chahining for better performance, but are
> there any scenarios we should disable or start a new chain mainly for the
> purpose of better performance ?*
>
> StreamExecutionEnvironment.disableOperatorChaining()
>
> someStream.filter(...).map(...).startNewChain().map(...)
>
> someStream.map(...).disableChaining()
>
>


Re: Using S3 as a streaming File source

2020-09-02 Thread orionemail
OK thanks for the notice on the cost point. I will check the cost calculations.

This already does have SNS enabled for another solution to this problem, but 
I'm trying to use the minimal amount of different software components at this 
stage of the pipeline. My prefered approach would have been them to send this 
data directly to a Kinesis/Kafka stream but that is not an option at this time.

Thanks for the assistance.

Sent with [ProtonMail](https://protonmail.com) Secure Email.

‐‐‐ Original Message ‐‐‐
On Tuesday, 1 September 2020 17:53, Ayush Verma  wrote:

> Word of caution. Streaming from S3 is really cost prohibitive as the only way 
> to detect new files is to continuously spam the S3 List API.
>
> On Tue, Sep 1, 2020 at 4:50 PM Jörn Franke  wrote:
>
>> Why don’t you get an S3 notification on SQS and do the actions from there?
>>
>> You will probably need to write the content of the files to a no sql 
>> database .
>>
>> Alternatively send the s3 notification to Kafka and read flink from there.
>>
>> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
>>
>>> Am 01.09.2020 um 16:46 schrieb orionemail :
>>
>>> 
>>> Hi,
>>>
>>> I have a S3 bucket that is continuously written to by millions of devices. 
>>> These upload small compressed archives.
>>>
>>> What I want to do is treat the tar gzipped (.tgz) files as a streaming 
>>> source and process each archive. The archive contains three files that each 
>>> might need to be processed.
>>>
>>> I see that
>>>
>>> env.readFile(f
>>>
>>> ,
>>>
>>> bucket
>>>
>>> ,
>>>
>>> FileProcessingMode.
>>>
>>> PROCESS_CONTINUOUSLY
>>>
>>> ,
>>>
>>> 1L
>>>
>>> ).print()
>>>
>>> ;
>>>
>>> might do what I need, but I am unsure how best to implement 'f' - the 
>>> InputFileFormat. Is there a similar example for me to reference?
>>>
>>> Or is this idea not workable with this method? I need to ensure exactly 
>>> once, and also trigger removal of the files after processing.
>>>
>>> Thanks,
>>>
>>> Sent with [ProtonMail](https://protonmail.com) Secure Email.

Re: Default Flink Metrics Graphite

2020-09-02 Thread Till Rohrmann
Hi Vijayendra,

what was the problem in the end? Sharing this knowledge might help other
community member who might run into the same issue.

Cheers,
Till

On Wed, Sep 2, 2020 at 3:34 AM Vijayendra Yadav 
wrote:

> Thanks all, I could see the metrics.
>
> On Thu, Aug 27, 2020 at 7:51 AM Robert Metzger 
> wrote:
>
>> I don't think these error messages give us a hint why you can't see the
>> metrics (because they are about registering metrics, not reporting them)
>>
>> Are you sure you are using the right configuration parameters for Flink
>> 1.10? That all required JARs are in the lib/ folder (on all machines) and
>> that your graphite setup is working (have you confirmed that you can show
>> any metrics in the Graphite UI (maybe from a Graphite demo thingy))?
>>
>>
>> On Thu, Aug 27, 2020 at 2:05 AM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Chesnay and Dawid,
>>>
>>> I see multiple entries as following in Log:
>>>
>>> 2020-08-26 23:46:19,105 WARN
>>> org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while
>>> registering metric: numRecordsIn.
>>> java.lang.IllegalArgumentException: A metric named
>>> ip-99--99-99.taskmanager.container_1596056409708_1570_01_06.vdcs-kafka-flink-test.Map.0.numRecordsIn
>>> already exists
>>> at
>>> com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
>>> 2020-08-26 23:46:19,094 WARN
>>> org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while
>>> registering metric: numRecordsOut.
>>> java.lang.IllegalArgumentException: A metric named
>>> ip-99--99-999.taskmanager.container_1596056409708_1570_01_05.vdcs-kafka-flink-test.Map.2.numRecordsOut
>>> already exists
>>> at
>>> com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
>>> at
>>> org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
>>> at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
>>> at
>>> org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
>>> at
>>> org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:343)
>>> at
>>> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:426)
>>> at
>>> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:359)
>>> at
>>> org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:349)
>>> at
>>> org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.(OperatorIOMetricGroup.java:41)
>>> at
>>> org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.(OperatorMetricGroup.java:48)
>>> at
>>> org.apache.flink.runtime.metrics.groups.TaskMetricGroup.lambda$getOrAddOperator$0(TaskMetricGroup.java:154)
>>> at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
>>> at
>>> org.apache.flink.runtime.metrics.groups.TaskMetricGroup.getOrAddOperator(TaskMetricGroup.java:154)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:180)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
>>> at
>>> org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:75)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:429)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:353)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:144)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Regards,
>>> Vijay
>>>
>>>
>>> On Wed, Aug 26, 2020 at 7:53 AM Chesnay Schepler 
>>> wrote:
>>>
 metrics.reporter.grph.class:
 org.apache.flink.metrics.graphite.GraphiteReporter


 https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#graphite-orgapacheflinkmetricsgraphitegraphitereporter

 On 26/08/2020 16:40, Vijayendra Yadav wrote:

 Hi Dawid,

 I have 1.10.0 version of flink. What is alternative for this version ?

 Regards,
 Vijay


 On Aug 25, 2020, at 11:44 PM, Dawid Wysakowicz 
  wrote:

 

 Hi Vijay,

 I think the problem might be that you are using a wrong version of the
 reporte

Re: PyFlink - detect if environment is a StreamExecutionEnvironment

2020-09-02 Thread Till Rohrmann
Hi Manas,

I am not entirely sure but you might try to check whether
env._j_stream_execution_environment is an instance of
gateway.jvm.org.apache.flink.streaming.api.environment.LocalStreamEnvironment
via Python's isinstance function.

Cheers,
Till

On Wed, Sep 2, 2020 at 5:46 AM Manas Kale  wrote:

> Hi Xingbo,
> Thank you for clarifying that. I am indeed maintaining a different version
> of the code by commenting those lines, but I was just wondering if it was
> possible to detect the environment programmatically.
>
> Regards,
> Manas
>
> On Wed, Sep 2, 2020 at 7:32 AM Xingbo Huang  wrote:
>
>> Hi Manas,
>>
>> When running locally, you need
>> `ten_sec_summaries.get_job_client().get_job_execution_result().result()` to
>> wait job finished. However, when you submit to the cluster, you need to
>> delete this code. In my opinion, the current feasible solution is that you
>> prepare two sets of codes for this, although this is annoying. After all,
>> running jobs locally is usually for testing, so it should be acceptable to
>> prepare different codes.
>> In the long run, it should be the flink framework that makes different
>> behaviors according to different environments  so that users don’t need to
>> prepare different codes.
>>
>> Best,
>> Xingbo
>>
>> Manas Kale  于2020年9月1日周二 下午3:00写道:
>>
>>> Hi,
>>> I am trying to submit a pyFlink job in detached mode using the command:
>>>
>>> ../../flink-1.11.0/bin/flink run -d -py basic_streaming_job.py -j
>>> flink-sql-connector-kafka_2.11-1.11.0.jar
>>>
>>> The jobs are submitted successfully but the command does not return. I
>>> realized that was because I had the following line in
>>> basic_streaming_job.py:
>>>
>>> ten_sec_summaries.get_job_client().get_job_execution_result().result()
>>>
>>> This statement is useful when testing this locally within a minicluster
>>> (using python basic_streaming_job.py) but not needed when the job is
>>> submitted to a cluster.
>>>
>>> So I would like to programmatically detect if the
>>> StreamExecutionEnvironment is a localStreamEnvironment and execute the
>>> above snippet accordingly. How do I do this?
>>>
>>>
>>> Thanks,
>>> Manas
>>>
>>


Re: A couple of question for Stateful Functions

2020-09-02 Thread Till Rohrmann
Hi Dan,

thanks for reaching out to the Flink community. I'm pulling in Gordon and
Igal who will be able to answer your questions.

Cheers,
Till

On Wed, Sep 2, 2020 at 8:22 AM danp  wrote:

> Hi,
>
> Nice to see the progress of Stateful functions.
>
> I have a few questions that I hope you can reply to.
>
> My first question is regarding the newly implemented
> StatefulFunctionDataStreamBuilder.
> Is there anything to pay attention to if one first union a couple of
> streams
> and performs a sort via a keyBy and a KeyedProcessFunction before
> dispatching the messages to via RoutableMessage?
> In this sorting I'm using a mapstate and I guess my question is if one can
> freely mix Flink core with SF's code with regards to performance,
> fault-tolerance, and checkpointing?
>
> I'm unable to use Protobuf so POJO's is going to be processed. Is there a
> large performance impact of using Pojos instead of protos in SF?
>
> Also I wonder if there's going to be a severe performance penalty if I had
> a
> function that was called very often lets say 1000 a second and hold a
> PersistentAppendingBuffer with those objects appended for each message?
> Then
> when the 1001:st message comes or a timetrigger kicks in, I would write
> everything to disk and delete the state. Would the appendingBuffer be
> de-/serialized for each function invocation?
>
> If yes, is there any workaround for this so the data is just held in RAM?
>
> Thanks,
>
> Regards
> Dan
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: FlinkKafkaConsumer problem

2020-09-02 Thread Till Rohrmann
The reason two Flink jobs using a Kafka consumer with the same consumer
group are seeing the same events is that Flink's FlinkKafkaConsumer does
not participate in Kafka's consumer group management. Instead Flink
manually assigns all partitions to the source operators (on a per job
basis). The consumer group will only be used to commit the current
offset to the Kafka brokers.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:42 AM op <520075...@qq.com> wrote:

> hi,
> i am confused about consumer group of FlinkKafkaConsumer,
> i have two applications,with the same code like this:
> //---
>
>  val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>  Env.setRestartStrategy(RestartStrategies.noRestart())
>  val consumerProps = new Properties()
>  consumerProps.put("bootstrap.servers", brokers)
>  consumerProps.put("group.id", "test1234")
>
>  val consumer = new FlinkKafkaConsumer[String](topic,new 
> KafkaStringSchema,consumerProps).setStartFromLatest()
>  Env.addSource(consumer).print()
>  Env.execute()
>
> //---
>
> then i launch both,they have the same topic and  group.id,and when i send 
> some message to the topic,
>
> i find both application consume all the data ,which does‘t behave as kafka 
> consumer group,
>
> can someone tell me why?
>
>


Re: Job Manager taking long time to upload job graph on remote storage

2020-09-02 Thread Till Rohrmann
Hi Prakhar,

have you enabled HA for your cluster? If yes, then Flink will try to store
the job graph to the configured high-availability.storageDir in order to be
able to recover it. If this operation takes long, then it is either the
filesystem which is slow or storing the pointer in ZooKeeper. If it is the
filesystem, then I would suggest to check whether you have some read/write
quotas which might slow the operation down.

If you haven't enabled HA or persisting the jobGraph is not what takes
long, then the next most likely candidate is the recovery from a previous
checkpoint. Here again, Flink needs to read from the remote storage (in
your case GCS). Depending on the size of the checkpoint and the read
bandwidth, this can be faster or slower. The best way to figure out what
takes long is to share the logs with us so that we can confirm what takes
long.

To sum it up, the job submission is most likely slow because of the
interplay of Flink with the external system (most likely your configured
filesystem). If the filesystem is somewhat throttled, then Flink cannot do
much about it.

What you could try to do is to check whether your jar contains dependencies
which are not needed (e.g. Flink dependencies which are usually provided by
the system). That way you could decrease the size of the jar a bit.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur  wrote:

> Hi,
>
> We are currently running Flink 1.9.0. We see a delay of around 20 seconds
> in order to start a job on a session Flink cluster. We start the job using
> Flink's monitoring REST API where our jar is already uploaded on Job
> Manager. Our jar file size is around 200 MB. We are using memory state
> backend having GCS as remote storage.
>
> On running the cluster in debug mode, we observed that generating the plan
> itself takes around 6 seconds and copying job graph from local to the
> remote folder takes around 10 seconds.
>
> We were wondering whether this delay is expected or if it can be reduced
> via tweaking any configuration?
>
> Thank you. Regards
> Prakhar Mathur
>


Re: Fail to deploy Flink on minikube

2020-09-02 Thread Till Rohrmann
Hi art,

could you verify that the jobmanager-service has been started? It looks as
if the name flink-jobmanager is not resolvable. It could also help to know
the Minikube and K8s version you are using.

Cheers,
Till

On Wed, Sep 2, 2020 at 9:50 AM art  wrote:

> Hi,I’m going to deploy flink on minikube referring to
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/kubernetes.html
> ;
> kubectl create -f flink-configuration-configmap.yaml
> kubectl create -f jobmanager-service.yaml
> kubectl create -f jobmanager-session-deployment.yaml
> kubectl create -f taskmanager-session-deployment.yaml
>
> But I got this
>
> 2020-09-02 06:45:42,664 WARN  akka.remote.ReliableDeliverySupervisor
> [] - Association with remote system [
> akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated
> for [50] ms. Reason: [Association failed with [
> akka.tcp://flink@flink-jobmanager:6123]] Caused by:
> [java.net.UnknownHostException: flink-jobmanager: Temporary failure in name
> resolution]
> 2020-09-02 06:45:42,691 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
> not resolve ResourceManager address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*,
> retrying in 1 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
> 2020-09-02 06:46:02,731 INFO
>  org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could
> not resolve ResourceManager address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*,
> retrying in 1 ms: Could not connect to rpc endpoint under address
> akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
> 2020-09-02 06:46:12,731 INFO  akka.remote.transport.ProtocolStateActor
> [] - No response from remote for outbound association.
> Associate timed out after [2 ms].
>
> And when I run the command 'kubectl exec -ti
> flink-taskmanager-74c68c6f48-9tkvd -- /bin/bash’ && ‘ping flink-jobmanager’
> , I find I cannot ping flink-jobmanager from taskmanager
>
> I am new to k8s, can anyone give me some tutorial? Thanks a lot !
>


Fail to deploy Flink on minikube

2020-09-02 Thread art
Hi,I’m going to deploy flink on minikube referring to 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/deployment/kubernetes.html
 
;
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-session-deployment.yaml
kubectl create -f taskmanager-session-deployment.yaml

But I got this

2020-09-02 06:45:42,664 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@flink-jobmanager:6123] has failed, address is now gated for 
[50] ms. Reason: [Association failed with 
[akka.tcp://flink@flink-jobmanager:6123]] Caused by: 
[java.net.UnknownHostException: flink-jobmanager: Temporary failure in name 
resolution]
2020-09-02 06:45:42,691 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
2020-09-02 06:46:02,731 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Could not 
resolve ResourceManager address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*, retrying in 
1 ms: Could not connect to rpc endpoint under address 
akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_*.
2020-09-02 06:46:12,731 INFO  akka.remote.transport.ProtocolStateActor  
   [] - No response from remote for outbound association. Associate 
timed out after [2 ms]. 

And when I run the command 'kubectl exec -ti flink-taskmanager-74c68c6f48-9tkvd 
-- /bin/bash’ && ‘ping flink-jobmanager’ , I find I cannot ping 
flink-jobmanager from taskmanager

I am new to k8s, can anyone give me some tutorial? Thanks a lot !

Job Manager taking long time to upload job graph on remote storage

2020-09-02 Thread Prakhar Mathur
Hi,

We are currently running Flink 1.9.0. We see a delay of around 20 seconds
in order to start a job on a session Flink cluster. We start the job using
Flink's monitoring REST API where our jar is already uploaded on Job
Manager. Our jar file size is around 200 MB. We are using memory state
backend having GCS as remote storage.

On running the cluster in debug mode, we observed that generating the plan
itself takes around 6 seconds and copying job graph from local to the
remote folder takes around 10 seconds.

We were wondering whether this delay is expected or if it can be reduced
via tweaking any configuration?

Thank you. Regards
Prakhar Mathur


FlinkKafkaConsumer problem

2020-09-02 Thread op
    hi,    i am confused about consumer group of 
FlinkKafkaConsumer,     i have two applications,with the same 
code like this:
//---
 val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
 Env.setRestartStrategy(RestartStrategies.noRestart())
 val consumerProps = new Properties()
 consumerProps.put("bootstrap.servers", brokers)
 consumerProps.put("group.id", "test1234")

 val consumer = new FlinkKafkaConsumer[String](topic,new 
KafkaStringSchema,consumerProps).setStartFromLatest()
 Env.addSource(consumer).print()
 Env.execute()//---then i launch both,they have 
the same topic and  group.id,and when i send some message to the topic,i find 
both application consume all the data ,which does??t behave as kafka consumer 
group??can someone tell me why?

Re: Editing Rowtime for SQL Table

2020-09-02 Thread Timo Walther
Yes, the new TableSource API allows to emit retractions. However, it 
does not give you direct access to DataStream API.


FLIP-136 [1] might help you in the near future. We hope it can be part 
of 1.12.


Regards,
Timo

[1] 
https://lists.apache.org/thread.html/r62b47ec6812ddbafed65ac79e31ca0305099893559f1e5a991dee550%40%3Cdev.flink.apache.org%3E


On 01.09.20 22:55, Satyam Shekhar wrote:

Thanks for your replies Matthias and Timo.

Converting the Table to DataStream, assigning a new Watermark & Rowtime 
attribute, and converting back to Table makes sense. One challenge with 
that approach is that Table to DataStream conversion could emit 
retractable data stream, however, I think, that can now be handled with 
the new TableSource API (in 1.11) that allows TableSource to emit 
retractions.


I'll try this approach when I migrate to the new API and report back.

Regards,
Satyam

On Tue, Sep 1, 2020 at 4:46 AM Timo Walther > wrote:


Hi Satyam,

Matthias is right. A rowtime attribute cannot be modified and needs
to be passed "as is" through the pipeline. The only exceptions are
if a newer rowtime is offered such as `TUMBLE_ROWTIME` or
`MATCH_ROWTIME`. In your case, you need to define utime as the time
attribute. If this is not possible, you either express the
computation in regular SQL (with non-streaming optimizations) or you
go to DataStream API prepare the table (assign new watermark and
StreamRecord timestamp there) and go back to Table API.

I hope this helps.

Regards,
Timo

On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl
mailto:matth...@ververica.com>> wrote:

Hi Satyam,
Thanks for your post. Unfortunately, it looks like you cannot
change the rowtime column here. The rowtime is strongly coupled
with the Watermarks feature. By changing the rowtime column we
cannot ensure that the watermarks are still aligned as Fabian
mentioned in [1].

@Timo Walther  : Could you verify my
findings?

Best,
Matthias

[1]

https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation

On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar
mailto:satyamshek...@gmail.com>> wrote:

Hello,

I use Flink for continuous evaluation of SQL queries on
streaming data. One of the use cases requires us to run
recursive SQL queries. I am unable to find a way to edit
rowtime time attribute of the intermediate result table.

For example, let's assume that there is a table T0 with schema -
root
  |-- str1: STRING
  |-- int1: BIGINT
  |-- utime: TIMESTAMP(3)
  |-- itime: TIMESTAMP(3) *ROWTIME*

Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime
from T0");

I wish to change the rowtime of V0 from itime to utime. I
tried doing -

V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());

but ran into the following exception -

org.apache.flink.table.api.ValidationException: Window
properties can only be used on windowed tables.
at

org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at

org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at

org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at

org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at

org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
~[na:na]
at

org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at

org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at

org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
~[flink-table-api-java-1.11.1.jar:1.11.1]