Re: on YARN question

2020-04-09 Thread Yangze Guo
Do you mean to run it in detach mode? If so, you could add "-d".

Best,
Yangze Guo

On Fri, Apr 10, 2020 at 1:05 PM Ethan Li  wrote:
>
> I am not a Flink expert. Just out of curiosity,
>
> I am seeing
>
> “YARN application has been deployed successfully“
>
> Does it not mean it’s working properly?
>
>
> Best,
> Ethan
>
> On Apr 9, 2020, at 23:01, 罗杰  wrote:
>
> 
> Hello, could you please tell me how to solve the problem that when I use 
> yarn-session.sh, the card will not run when it reaches the following place?
> Hadoop2.7.2  flink 1.10.0
> have: flink/lib/ flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> [root@hadoop131 bin]# ./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test 
> -d
> 2020-04-10 11:00:35,434 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.address, hadoop131
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.heap.size, 1024m
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.memory.process.size, 1568m
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: parallelism.default, 1
> 2020-04-10 11:00:35,438 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-04-10 11:00:35,553 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2020-04-10 11:00:36,141 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2020-04-10 11:00:36,323 INFO  
> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop user 
> set to root (auth:SIMPLE)
> 2020-04-10 11:00:36,509 INFO  
> org.apache.flink.runtime.security.modules.JaasModule  - Jaas file 
> will be created as /tmp/jaas-9182197754252132172.conf.
> 2020-04-10 11:00:36,554 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory ('/opt/module/flink-1.10.0/conf') 
> already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2020-04-10 11:00:36,653 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at hadoop132/192.168.15.132:8032
> 2020-04-10 11:00:36,903 INFO  
> org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils  - The 
> derived from fraction jvm overhead memory (156.800mb (164416719 bytes)) is 
> less than its min value 192.000mb (201326592 bytes), min value will be used 
> instead
> 2020-04-10 11:00:37,048 WARN  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment 
> variable is set. The Flink YARN Client needs one of these to be set to 
> properly load the Hadoop configuration for accessing YARN.
> 2020-04-10 11:00:37,109 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1568, 
> slotsPerTaskManager=1}
> 2020-04-10 11:00:50,693 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Submitting application master application_1586487382351_0001
> 2020-04-10 11:00:51,093 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1586487382351_0001
> 2020-04-10 11:00:51,093 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Waiting for the cluster to be allocated
> 2020-04-10 11:00:51,096 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Deploying cluster, current state ACCEPTED
> 2020-04-10 11:01:04,140 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - YARN application has been deployed successfully.
> 2020-04-10 11:01:04,141 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Found Web Interface hadoop133:40677 of application 
> 'application_1586487382351_0001'.
> JobManager Web Interface: http://hadoop133:40677
>


Re: on YARN question

2020-04-09 Thread Ethan Li
I am not a Flink expert. Just out of curiosity,

I am seeing 

“YARN application has been deployed successfully“

Does it not mean it’s working properly? 


Best,
Ethan 

> On Apr 9, 2020, at 23:01, 罗杰  wrote:
> 
> 
> Hello, could you please tell me how to solve the problem that when I use 
> yarn-session.sh, the card will not run when it reaches the following place?
> Hadoop2.7.2  flink 1.10.0  
> have: flink/lib/ flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
> [root@hadoop131 bin]# ./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test 
> -d
> 2020-04-10 11:00:35,434 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.address, hadoop131
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.heap.size, 1024m
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.memory.process.size, 1568m
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-04-10 11:00:35,437 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: parallelism.default, 1
> 2020-04-10 11:00:35,438 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-04-10 11:00:35,553 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - Found Yarn properties file under /tmp/.yarn-properties-root.
> 2020-04-10 11:00:36,141 WARN  org.apache.hadoop.util.NativeCodeLoader 
>   - Unable to load native-hadoop library for your platform... 
> using builtin-java classes where applicable
> 2020-04-10 11:00:36,323 INFO  
> org.apache.flink.runtime.security.modules.HadoopModule- Hadoop user 
> set to root (auth:SIMPLE)
> 2020-04-10 11:00:36,509 INFO  
> org.apache.flink.runtime.security.modules.JaasModule  - Jaas file 
> will be created as /tmp/jaas-9182197754252132172.conf.
> 2020-04-10 11:00:36,554 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli   
>   - The configuration directory ('/opt/module/flink-1.10.0/conf') 
> already contains a LOG4J config file.If you want to use logback, then please 
> delete or rename the log configuration file.
> 2020-04-10 11:00:36,653 INFO  org.apache.hadoop.yarn.client.RMProxy   
>   - Connecting to ResourceManager at hadoop132/192.168.15.132:8032
> 2020-04-10 11:00:36,903 INFO  
> org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils  - The 
> derived from fraction jvm overhead memory (156.800mb (164416719 bytes)) is 
> less than its min value 192.000mb (201326592 bytes), min value will be used 
> instead
> 2020-04-10 11:00:37,048 WARN  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment 
> variable is set. The Flink YARN Client needs one of these to be set to 
> properly load the Hadoop configuration for accessing YARN.
> 2020-04-10 11:00:37,109 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Cluster specification: 
> ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1568, 
> slotsPerTaskManager=1}
> 2020-04-10 11:00:50,693 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Submitting application master application_1586487382351_0001
> 2020-04-10 11:00:51,093 INFO  
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
> application application_1586487382351_0001
> 2020-04-10 11:00:51,093 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Waiting for the cluster to be allocated
> 2020-04-10 11:00:51,096 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Deploying cluster, current state ACCEPTED
> 2020-04-10 11:01:04,140 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - YARN application has been deployed successfully.
> 2020-04-10 11:01:04,141 INFO  org.apache.flink.yarn.YarnClusterDescriptor 
>   - Found Web Interface hadoop133:40677 of application 
> 'application_1586487382351_0001'.
> JobManager Web Interface: http://hadoop133:40677
> 


on YARN question

2020-04-09 Thread ????
Hello, could you please tell me how to solve the problem that when I use 
yarn-session.sh, the card will not run when it reaches the following place?
Hadoop2.7.2  flink 1.10.0  
have?? flink/lib/ flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
[root@hadoop131 bin]# ./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
2020-04-10 11:00:35,434 INFO  
org.apache.flink.configuration.GlobalConfiguration        
    - Loading configuration property: jobmanager.rpc.address, 
hadoop131
2020-04-10 11:00:35,437 INFO  
org.apache.flink.configuration.GlobalConfiguration        
    - Loading configuration property: jobmanager.rpc.port, 6123
2020-04-10 11:00:35,437 INFO  
org.apache.flink.configuration.GlobalConfiguration        
    - Loading configuration property: jobmanager.heap.size, 1024m
2020-04-10 11:00:35,437 INFO  
org.apache.flink.configuration.GlobalConfiguration        
    - Loading configuration property: 
taskmanager.memory.process.size, 1568m
2020-04-10 11:00:35,437 INFO  
org.apache.flink.configuration.GlobalConfiguration        
    - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2020-04-10 11:00:35,437 INFO  
org.apache.flink.configuration.GlobalConfiguration        
    - Loading configuration property: parallelism.default, 1
2020-04-10 11:00:35,438 INFO  
org.apache.flink.configuration.GlobalConfiguration        
    - Loading configuration property: 
jobmanager.execution.failover-strategy, region
2020-04-10 11:00:35,553 INFO  
org.apache.flink.yarn.cli.FlinkYarnSessionCli          
       - Found Yarn properties file under 
/tmp/.yarn-properties-root.
2020-04-10 11:00:36,141 WARN  
org.apache.hadoop.util.NativeCodeLoader          
             - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
2020-04-10 11:00:36,323 INFO  
org.apache.flink.runtime.security.modules.HadoopModule      
  - Hadoop user set to root (auth:SIMPLE)
2020-04-10 11:00:36,509 INFO  
org.apache.flink.runtime.security.modules.JaasModule        
  - Jaas file will be created as /tmp/jaas-9182197754252132172.conf.
2020-04-10 11:00:36,554 WARN  
org.apache.flink.yarn.cli.FlinkYarnSessionCli          
       - The configuration directory 
('/opt/module/flink-1.10.0/conf') already contains a LOG4J config file.If you 
want to use logback, then please delete or rename the log configuration file.
2020-04-10 11:00:36,653 INFO  org.apache.hadoop.yarn.client.RMProxy  
                      
 - Connecting to ResourceManager at hadoop132/192.168.15.132:8032
2020-04-10 11:00:36,903 INFO  
org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils  - The 
derived from fraction jvm overhead memory (156.800mb (164416719 bytes)) is less 
than its min value 192.000mb (201326592 bytes), min value will be used instead
2020-04-10 11:00:37,048 WARN  
org.apache.flink.yarn.YarnClusterDescriptor          
         - Neither the HADOOP_CONF_DIR nor the 
YARN_CONF_DIR environment variable is set. The Flink YARN Client needs one of 
these to be set to properly load the Hadoop configuration for accessing YARN.
2020-04-10 11:00:37,109 INFO  
org.apache.flink.yarn.YarnClusterDescriptor          
         - Cluster specification: 
ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1568, 
slotsPerTaskManager=1}
2020-04-10 11:00:50,693 INFO  
org.apache.flink.yarn.YarnClusterDescriptor          
         - Submitting application master 
application_1586487382351_0001
2020-04-10 11:00:51,093 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl      
   - Submitted application application_1586487382351_0001
2020-04-10 11:00:51,093 INFO  
org.apache.flink.yarn.YarnClusterDescriptor          
         - Waiting for the cluster to be allocated
2020-04-10 11:00:51,096 INFO  
org.apache.flink.yarn.YarnClusterDescriptor          
         - Deploying cluster, current state ACCEPTED
2020-04-10 11:01:04,140 INFO  
org.apache.flink.yarn.YarnClusterDescriptor          
         - YARN application has been deployed 
successfully.
2020-04-10 11:01:04,141 INFO  
org.apache.flink.yarn.YarnClusterDescriptor          
         - Found Web Interface hadoop133:40677 of 
application 'application_1586487382351_0001'.
JobManager Web Interface: http://hadoop133:40677

Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread Jingsong Li
Hi lei,

I think the reason is that our `HiveMapredSplitReader` not supports name
mapping reading for parquet format.
Can you create a JIRA for tracking this?

Best,
Jingsong Lee

On Fri, Apr 10, 2020 at 9:42 AM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> I am using Hive 3.1.1
> The table has many fields, each field is corresponded to a feild in the 
> RobotUploadData0101
> class.
>
> CREATE TABLE `robotparquet`(`robotid` int,   `framecount` int,
> `robottime` bigint,   `robotpathmode` int,   `movingmode` int,
> `submovingmode` int,   `xlocation` int,   `ylocation` int,
> `robotradangle` int,   `velocity` int,   `acceleration` int,
> `angularvelocity` int,   `angularacceleration` int,   `literangle` int,
> `shelfangle` int,   `onloadshelfid` int,   `rcvdinstr` int,   `sensordist`
> int,   `pathstate` int,   `powerpresent` int,   `neednewpath` int,
> `pathelenum` int,   `taskstate` int,   `receivedtaskid` int,
> `receivedcommcount` int,   `receiveddispatchinstr` int,
> `receiveddispatchcount` int,   `subtaskmode` int,   `versiontype` int,
> `version` int,   `liftheight` int,   `codecheckstatus` int,
> `cameraworkmode` int,   `backrimstate` int,   `frontrimstate` int,
> `pathselectstate` int,   `codemisscount` int,   `groundcameraresult` int,
> `shelfcameraresult` int,   `softwarerespondframe` int,   `paramstate` int,
>   `pilotlamp` int,   `codecount` int,   `dist2waitpoint` int,
> `targetdistance` int,   `obstaclecount` int,   `obstacleframe` int,
> `cellcodex` int,   `cellcodey` int,   `cellangle` int,   `shelfqrcode` int,
>   `shelfqrangle` int,   `shelfqrx` int,   `shelfqry` int,
> `trackthetaerror` int,   `tracksideerror` int,   `trackfuseerror` int,
> `lifterangleerror` int,   `lifterheighterror` int,   `linearcmdspeed` int,
>   `angluarcmdspeed` int,   `liftercmdspeed` int,   `rotatorcmdspeed` int)
> PARTITIONED BY (`hour` string) STORED AS parquet;
>
>
> Thanks,
> Lei
> --
> wangl...@geekplus.com.cn
>
>
> *From:* Jingsong Li 
> *Date:* 2020-04-09 21:45
> *To:* wangl...@geekplus.com.cn
> *CC:* Jark Wu ; lirui ; user
> 
> *Subject:* Re: Re: fink sql client not able to read parquet format table
> Hi lei,
>
> Which hive version did you use?
> Can you share the complete hive DDL?
>
> Best,
> Jingsong Lee
>
> On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> I am using the newest 1.10 blink planner.
>>
>> Perhaps it is because of the method i used to write the parquet file.
>>
>> Receive kafka message, transform each message to a Java class Object,
>> write the Object to HDFS using StreamingFileSink, add  the HDFS path as a
>> partition of the hive table
>>
>> No matter what the order of the field description in  hive ddl statement,
>> the hive client will work, as long as  the field name is the same with Java
>> Object field name.
>> But flink sql client will not work.
>>
>> DataStream sourceRobot = source.map( x->transform(x));
>> final StreamingFileSink sink;
>> sink = StreamingFileSink
>> .forBulkFormat(new 
>> Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
>> ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
>>
>> For example
>> RobotUploadData0101 has two fields:  robotId int, robotTime long
>>
>> CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and
>> CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
>> is the same for hive client, but is different for flink-sql client
>>
>> It is an expected behavior?
>>
>> Thanks,
>> Lei
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>
>> *From:* Jark Wu 
>> *Date:* 2020-04-09 14:48
>> *To:* wangl...@geekplus.com.cn; Jingsong Li ;
>> lirui 
>> *CC:* user 
>> *Subject:* Re: fink sql client not able to read parquet format table
>> Hi Lei,
>>
>> Are you using the newest 1.10 blink planner?
>>
>> I'm not familiar with Hive and parquet, but I know @Jingsong Li
>>  and @li...@apache.org  are
>> experts on this. Maybe they can help on this question.
>>
>> Best,
>> Jark
>>
>> On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn <
>> wangl...@geekplus.com.cn> wrote:
>>
>>>
>>> Hive table stored as parquet.
>>>
>>> Under hive client:
>>> hive> select robotid from robotparquet limit 2;
>>> OK
>>> 1291097
>>> 1291044
>>>
>>>
>>> But under flink sql-client the result is 0
>>> Flink SQL> select robotid  from robotparquet limit 2;
>>>   robotid
>>>  0
>>>  0
>>>
>>> Any insight on this?
>>>
>>> Thanks,
>>> Lei
>>>
>>>
>>>
>>> --
>>> wangl...@geekplus.com.cn
>>>
>>>
>
> --
> Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee


[Stateful Functions] Using statefun for E2E testing

2020-04-09 Thread Oytun Tez
Hi there,

Today we were designing a test for a workflow that involved 3 different
systems talking to each other async. My colleague came with the idea that
we could use Flink for E2E, which we got excited about.

We came with a quick implementation, within our existing Flink application,
after some hours of debugging this and that, everything actually worked
very nicely. We triggered the initial actions within Functions, other
Functions kept state for CEP-like logics (can we use Flink CEP directly?),
some events triggered validation assortments via async API calls and such...

Has anyone used a similar approach? This is just a general question to see
resources about integration testing via Flink.



 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread wangl...@geekplus.com.cn

I am using Hive 3.1.1 
The table has many fields, each field is corresponded to a feild in the 
RobotUploadData0101 class.

CREATE TABLE `robotparquet`(`robotid` int,   `framecount` int,   `robottime` 
bigint,   `robotpathmode` int,   `movingmode` int,   `submovingmode` int,   
`xlocation` int,   `ylocation` int,   `robotradangle` int,   `velocity` int,   
`acceleration` int,   `angularvelocity` int,   `angularacceleration` int,   
`literangle` int,   `shelfangle` int,   `onloadshelfid` int,   `rcvdinstr` int, 
  `sensordist` int,   `pathstate` int,   `powerpresent` int,   `neednewpath` 
int,   `pathelenum` int,   `taskstate` int,   `receivedtaskid` int,   
`receivedcommcount` int,   `receiveddispatchinstr` int,   
`receiveddispatchcount` int,   `subtaskmode` int,   `versiontype` int,   
`version` int,   `liftheight` int,   `codecheckstatus` int,   `cameraworkmode` 
int,   `backrimstate` int,   `frontrimstate` int,   `pathselectstate` int,   
`codemisscount` int,   `groundcameraresult` int,   `shelfcameraresult` int,   
`softwarerespondframe` int,   `paramstate` int,   `pilotlamp` int,   
`codecount` int,   `dist2waitpoint` int,   `targetdistance` int,   
`obstaclecount` int,   `obstacleframe` int,   `cellcodex` int,   `cellcodey` 
int,   `cellangle` int,   `shelfqrcode` int,   `shelfqrangle` int,   `shelfqrx` 
int,   `shelfqry` int,   `trackthetaerror` int,   `tracksideerror` int,   
`trackfuseerror` int,   `lifterangleerror` int,   `lifterheighterror` int,   
`linearcmdspeed` int,   `angluarcmdspeed` int,   `liftercmdspeed` int,   
`rotatorcmdspeed` int) PARTITIONED BY (`hour` string) STORED AS parquet; 


Thanks,
Lei


wangl...@geekplus.com.cn 

 
From: Jingsong Li
Date: 2020-04-09 21:45
To: wangl...@geekplus.com.cn
CC: Jark Wu; lirui; user
Subject: Re: Re: fink sql client not able to read parquet format table
Hi lei,

Which hive version did you use?
Can you share the complete hive DDL?

Best,
Jingsong Lee

On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn 
 wrote:

I am using the newest 1.10 blink planner.

Perhaps it is because of the method i used to write the parquet file.

Receive kafka message, transform each message to a Java class Object, write the 
Object to HDFS using StreamingFileSink, add  the HDFS path as a partition of 
the hive table

No matter what the order of the field description in  hive ddl statement, the 
hive client will work, as long as  the field name is the same with Java Object 
field name.
But flink sql client will not work.

DataStream sourceRobot = source.map( x->transform(x));
final StreamingFileSink sink;
sink = StreamingFileSink
.forBulkFormat(new 
Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
For example 
RobotUploadData0101 has two fields:  robotId int, robotTime long

CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and 
CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
is the same for hive client, but is different for flink-sql client

It is an expected behavior?

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@geekplus.com.cn; Jingsong Li; lirui
CC: user
Subject: Re: fink sql client not able to read parquet format table
Hi Lei,

Are you using the newest 1.10 blink planner? 

I'm not familiar with Hive and parquet, but I know @Jingsong Li and 
@li...@apache.org are experts on this. Maybe they can help on this question. 

Best,
Jark

On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn 
 wrote:

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



-- 
Best, Jingsong Lee


Re: End to End Latency Tracking in flink

2020-04-09 Thread Lu Niu
An Operator like below will expose lag between current time and event time
passing the operator. I add that after the source and before the sink, and
calculate sink_delay - source_delay in grafana. would that be a generic
solution to solve the problem?
```
public class EmitLagOperator extends AbstractStreamOperator
implements OneInputStreamOperator {

  private transient long delay;

  public EmitLagOperator() {
chainingStrategy = ChainingStrategy.ALWAYS;
  }

  @Override
  public void processElement(StreamRecord element) throws Exception {
long now = getProcessingTimeService().getCurrentProcessingTime();
delay = now - element.getTimestamp();
output.collect(element);
  }

  @Override
  public void open() throws Exception {
super.open();
getRuntimeContext()
.getMetricGroup()
.gauge("delay", new Gauge() {
  @Override
  public Long getValue() {
return delay;
  }
});
  }
}
```

On Wed, Apr 1, 2020 at 7:59 PM zoudan  wrote:

> Hi,
> I think we may add latency metric for each operator, which can reflect
> consumption ability of each operator.
>
> Best,
> Dan Zou
>
>
> 在 2020年3月30日,18:19,Guanghui Zhang  写道:
>
> Hi.
> At flink source connector, you can send $source_current_time - $event_time
> metric.
> In the meantime, at flink sink connector, you can send $sink_current_time
> - $event_time metric.
> Then you use  $sink_current_time - $event_time - ($source_current_time -
> $event_time) = $sink_current_time - $source_current_time as the latency of
> end to end。
>
> Oscar Westra van Holthe - Kind  于2020年3月30日周一
> 下午5:15写道:
>
>> On Mon, 30 Mar 2020 at 05:08, Lu Niu  wrote:
>>
>>> $current_processing - $event_time works for event time. How about
>>> processing time? Is there a good way to measure the latency?
>>>
>>
>> To measure latency you'll need some way to determine the time spent
>> between the start and end of your pipeline.
>>
>> To measure latency when using processing time, you'll need to partially
>> use ingestion time. That is, you'll need to add the 'current' processing
>> time as soon as messages are ingested.
>>
>> With it, you can then use the $current_processing - $ingest_time
>> solution that was already mentioned.
>>
>> Kind regards,
>> Oscar
>>
>> --
>> Oscar Westra van Holthe - Kind
>>
>
>


Re: [PROPOSAL] Contribute training materials to Apache Flink

2020-04-09 Thread Seth Wiesman
Hi David,

+1 to add to the project.

I agree that flink.apache.org and flink playgrounds respectively are the
best places to host this content.

On Thu, Apr 9, 2020 at 2:56 PM Niels Basjes  wrote:

> Hi,
>
> Sounds like a very nice thing to have as part of the project ecosystem.
>
> Niels
>
> On Thu, Apr 9, 2020 at 8:10 PM David Anderson  wrote:
>
>> Dear Flink Community!
>>
>> For some time now Ververica has been hosting some freely available Apache
>> Flink training materials at https://training.ververica.com. This includes
>> tutorial content covering the core concepts of the DataStream API, and
>> hands-on exercises that accompany those explanations.
>>
>> Website: https://training.ververica.com
>> Website repo: https://github.com/dataArtisans/flink-training
>> Exercises: repo: https://github.com/ververica/flink-training-exercises
>>
>> We would like to contribute this training content to Apache Flink. By
>> doing
>> so, we hope to make it even easier for folks to get started with Flink.
>> Especially during this time when so many are working from home, we'd like
>> to get this self-paced training course out where more people will see it.
>>
>> If the community wants these training materials, then this also raises the
>> question of where to put them. We are thinking it would be best to
>> integrate the website content into flink.apache.org, and to add the
>> exercises to flink-playgrounds -- but these points can be discussed
>> separately once we've established that the community wants this content.
>>
>> Looking forward to hearing what you think!
>>
>> Best regards,
>> David
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: [PROPOSAL] Contribute training materials to Apache Flink

2020-04-09 Thread Niels Basjes
Hi,

Sounds like a very nice thing to have as part of the project ecosystem.

Niels

On Thu, Apr 9, 2020 at 8:10 PM David Anderson  wrote:

> Dear Flink Community!
>
> For some time now Ververica has been hosting some freely available Apache
> Flink training materials at https://training.ververica.com. This includes
> tutorial content covering the core concepts of the DataStream API, and
> hands-on exercises that accompany those explanations.
>
> Website: https://training.ververica.com
> Website repo: https://github.com/dataArtisans/flink-training
> Exercises: repo: https://github.com/ververica/flink-training-exercises
>
> We would like to contribute this training content to Apache Flink. By doing
> so, we hope to make it even easier for folks to get started with Flink.
> Especially during this time when so many are working from home, we'd like
> to get this self-paced training course out where more people will see it.
>
> If the community wants these training materials, then this also raises the
> question of where to put them. We are thinking it would be best to
> integrate the website content into flink.apache.org, and to add the
> exercises to flink-playgrounds -- but these points can be discussed
> separately once we've established that the community wants this content.
>
> Looking forward to hearing what you think!
>
> Best regards,
> David
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: [PROPOSAL] Contribute training materials to Apache Flink

2020-04-09 Thread hemant singh
Hello David,

This is a nice move. Im pretty sure the more the resources at one place the
better it is for reference, especially for starters.

Thanks,
Hemant

On Thu, Apr 9, 2020 at 11:40 PM David Anderson  wrote:

> Dear Flink Community!
>
> For some time now Ververica has been hosting some freely available Apache
> Flink training materials at https://training.ververica.com. This includes
> tutorial content covering the core concepts of the DataStream API, and
> hands-on exercises that accompany those explanations.
>
> Website: https://training.ververica.com
> Website repo: https://github.com/dataArtisans/flink-training
> Exercises: repo: https://github.com/ververica/flink-training-exercises
>
> We would like to contribute this training content to Apache Flink. By
> doing so, we hope to make it even easier for folks to get started with
> Flink. Especially during this time when so many are working from home, we'd
> like to get this self-paced training course out where more people will see
> it.
>
> If the community wants these training materials, then this also raises the
> question of where to put them. We are thinking it would be best to
> integrate the website content into flink.apache.org, and to add the
> exercises to flink-playgrounds -- but these points can be discussed
> separately once we've established that the community wants this content.
>
> Looking forward to hearing what you think!
>
> Best regards,
> David
>


[PROPOSAL] Contribute training materials to Apache Flink

2020-04-09 Thread David Anderson
Dear Flink Community!

For some time now Ververica has been hosting some freely available Apache
Flink training materials at https://training.ververica.com. This includes
tutorial content covering the core concepts of the DataStream API, and
hands-on exercises that accompany those explanations.

Website: https://training.ververica.com
Website repo: https://github.com/dataArtisans/flink-training
Exercises: repo: https://github.com/ververica/flink-training-exercises

We would like to contribute this training content to Apache Flink. By doing
so, we hope to make it even easier for folks to get started with Flink.
Especially during this time when so many are working from home, we'd like
to get this self-paced training course out where more people will see it.

If the community wants these training materials, then this also raises the
question of where to put them. We are thinking it would be best to
integrate the website content into flink.apache.org, and to add the
exercises to flink-playgrounds -- but these points can be discussed
separately once we've established that the community wants this content.

Looking forward to hearing what you think!

Best regards,
David


Re: Passing checkpoint lock object to StreamSourceContexts.getSourceContext after StreamTask.getCheckpointLock deprecation

2020-04-09 Thread Yuval Itzchakov
One thing that had just occurred to me is that the context is later used to
emit watermarks and send elements downstream using ctx.collect. Perhaps all
these operations should now be switched to the new mailbox executor instead?

On Thu, Apr 9, 2020 at 8:52 PM Yuval Itzchakov  wrote:

> Hi,
>
> I have an implementation of a custom source, which uses
> StreamSourceContexts.getSourceContext (
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L46)
> which has a checkpointLock argument that's used for watermark emission and
> possibly additional work.
>
> Flink 1.10 marks StreamTask.getCheckpointLock as deprecated and points to
> using YieldingOperatorFactory instead.
>
> Question is, with the above method now being deprecated, which object
> should we pass to getSourceContext for the required checkpointLock?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Best Regards,
Yuval Itzchakov.


Passing checkpoint lock object to StreamSourceContexts.getSourceContext after StreamTask.getCheckpointLock deprecation

2020-04-09 Thread Yuval Itzchakov
Hi,

I have an implementation of a custom source, which uses
StreamSourceContexts.getSourceContext (
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L46)
which has a checkpointLock argument that's used for watermark emission and
possibly additional work.

Flink 1.10 marks StreamTask.getCheckpointLock as deprecated and points to
using YieldingOperatorFactory instead.

Question is, with the above method now being deprecated, which object
should we pass to getSourceContext for the required checkpointLock?

-- 
Best Regards,
Yuval Itzchakov.


Re: FlinkRuntimeException: Unexpected list element deserialization failure

2020-04-09 Thread Yun Tang
Hi

I think you have missed the "caused by" exception [1] , which could tell us the 
truth. Could you please check the full stack trace?

[1] 
https://github.com/apache/flink/blob/6846522ed67343c665f7e1dd02b7c06c05c1eb1d/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L151

Best
Yun Tang


From: anaray 
Sent: Friday, April 10, 2020 1:25
To: user@flink.apache.org 
Subject: FlinkRuntimeException: Unexpected list element deserialization failure


Hi flink team,

I see below exception . What could be the reason of the failure ? Please
share your thoughts?

Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected list
element deserialization failure
at
org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:153)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:452)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


FlinkRuntimeException: Unexpected list element deserialization failure

2020-04-09 Thread anaray


Hi flink team,

I see below exception . What could be the reason of the failure ? Please
share your thoughts?

Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected list
element deserialization failure
at
org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeNextElement(RocksDBListState.java:153)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.deserializeList(RocksDBListState.java:137)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:112)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:61)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:452)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
... 7 more

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink job didn't restart when a task failed

2020-04-09 Thread Till Rohrmann
For future reference, here is the issue to track the reconciliation logic
[1].

[1] https://issues.apache.org/jira/browse/FLINK-17075

Cheers,
Till

On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann  wrote:

> Hi Bruce,
>
> what you are describing sounds indeed quite bad. Quite hard to say whether
> we fixed such an issue in 1.10. It is definitely worth a try to upgrade,
> though.
>
> In order to further debug the problem, it would be really great if you
> could provide us with the log files of the JobMaster and the TaskExecutor.
> Ideally on debug log level if you have them.
>
> One thing which we wanted to add is sending the current task statuses as
> part of the heartbeat from the TM to the JM. Having this information would
> allow us to reconcile a situation like you are describing.
>
> Cheers,
> Till
>
> On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> this indeed seems very strange!
>>
>> @Gary Could you maybe have a look at this since you work/worked quite a
>> bit on the scheduler?
>>
>> Best,
>> Aljoscha
>>
>> On 09.04.20 05:46, Hanson, Bruce wrote:
>> > Hello Flink folks:
>> >
>> > We had a problem with a Flink job the other day that I haven’t seen
>> before. One task encountered a failure and switched to FAILED (see the full
>> exception below). After the failure, the task said it was notifying the Job
>> Manager:
>> >
>> > 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283]
>> level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor -
>> Un-registering task and sending final execution state FAILED to JobManager
>> for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>> >
>> > But I see no evidence that the Job Manager got the message. I would
>> expect with this type of failure that the Job Manager would restart the
>> job. In this case, the job carried on, hobbled, until the it stopped
>> processing data and our user had to manually restart the job. The job also
>> started experiencing checkpoint timeouts on every checkpoint due to this
>> operator stopping.
>> >
>> > Had the job restarted when this happened, I believe everything would
>> have been ok as the job had an appropriate restart strategy in place. The
>> Task Manager that this task was running on remained healthy and was
>> actively processing other tasks.
>> >
>> > It seems like this is some kind of a bug. Is this something anyone has
>> seen before? Could it be something that has been fixed if we went to Flink
>> 1.10?
>> >
>> > We are running Flink 1.7.2. I know it’s rather old now. We run a
>> managed environment where users can run their jobs, and are in the process
>> of upgrading to 1.10.
>> >
>> > This is the full exception that started the problem:
>> >
>> > 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO
>> org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION
>> (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>> Connection timed out (connection to '/100.112.98.121:36256')
>> > at org.apache.flink.runtime.io
>> .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>> > at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>> > 

Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Salva Alcántara
Sounds like a plan Arvid! Taking note of it, this is gold!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink job didn't restart when a task failed

2020-04-09 Thread Till Rohrmann
Hi Bruce,

what you are describing sounds indeed quite bad. Quite hard to say whether
we fixed such an issue in 1.10. It is definitely worth a try to upgrade,
though.

In order to further debug the problem, it would be really great if you
could provide us with the log files of the JobMaster and the TaskExecutor.
Ideally on debug log level if you have them.

One thing which we wanted to add is sending the current task statuses as
part of the heartbeat from the TM to the JM. Having this information would
allow us to reconcile a situation like you are describing.

Cheers,
Till

On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek  wrote:

> Hi,
>
> this indeed seems very strange!
>
> @Gary Could you maybe have a look at this since you work/worked quite a
> bit on the scheduler?
>
> Best,
> Aljoscha
>
> On 09.04.20 05:46, Hanson, Bruce wrote:
> > Hello Flink folks:
> >
> > We had a problem with a Flink job the other day that I haven’t seen
> before. One task encountered a failure and switched to FAILED (see the full
> exception below). After the failure, the task said it was notifying the Job
> Manager:
> >
> > 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283]
> level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor -
> Un-registering task and sending final execution state FAILED to JobManager
> for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
> >
> > But I see no evidence that the Job Manager got the message. I would
> expect with this type of failure that the Job Manager would restart the
> job. In this case, the job carried on, hobbled, until the it stopped
> processing data and our user had to manually restart the job. The job also
> started experiencing checkpoint timeouts on every checkpoint due to this
> operator stopping.
> >
> > Had the job restarted when this happened, I believe everything would
> have been ok as the job had an appropriate restart strategy in place. The
> Task Manager that this task was running on remained healthy and was
> actively processing other tasks.
> >
> > It seems like this is some kind of a bug. Is this something anyone has
> seen before? Could it be something that has been fixed if we went to Flink
> 1.10?
> >
> > We are running Flink 1.7.2. I know it’s rather old now. We run a managed
> environment where users can run their jobs, and are in the process of
> upgrading to 1.10.
> >
> > This is the full exception that started the problem:
> >
> > 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO
> org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION
> (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> Connection timed out (connection to '/100.112.98.121:36256')
> > at org.apache.flink.runtime.io
> .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
> > at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
> > a

Re: Inserting nullable data into NOT NULL columns

2020-04-09 Thread Timo Walther

Hi Gyula,

some disclaimer: the type system rework is still ongoing and there a a 
couple of known issues and missing end-to-end tests around this topic.


I would therefore recommend to declare the sink as `STRING NULL` for now.

Can you open an issue for your concrete use case with some example 
source/query/sink SQL and I'm happy to look into this.


Actually, `NULLIF()` should do the trick in the query but unfortunately 
the current Calcite behavior is not what one would expect.


Thanks,
Timo


On 09.04.20 15:53, Gyula Fóra wrote:

Hi All!

We ran into a problem while trying to insert data read from kafka into a 
table sink where some of the columns are not nullable.


The problem is that from Kafka we can only read nullable columns in JSON 
format otherwise you get the following error:


org.apache.flink.table.api.ValidationException: Type STRING NOT NULL of 
table field 'first' does not match with the physical type STRING of the 
'first' field of the TableSource return type.


On the other hand no matter what we do with the nullable column (things 
like select where ... is not null) the type will always STRING and will 
be incompatible with the sink leading to the following error:


Query schema: [first: STRING, ...]
Sink schema: [first: STRING NOT NULL, ...]

Any idea on how to resolve this type mismatch between nullable and 
non-nullable data? I feel that a query like (select x from y where x is 
not null ) should change the type to not null.


Thanks
Gyula




Re: Possible memory leak in JobManager (Flink 1.10.0)?

2020-04-09 Thread Till Rohrmann
For further reference, I've created this issue [1] to track the problem.

[1] https://issues.apache.org/jira/browse/FLINK-17073

Cheers,
Till

On Thu, Apr 9, 2020 at 1:20 PM Yun Tang  wrote:

> Hi Marc
>
> The left 'chk-X' folders, which should be discarded when removing
> checkpoint at the final stage, could also prove that those not discarded
> completed checkpoint meta occupied the memory.
>
> If we treat your average checkpoint meta size as 30KB, 2 not-discarded
> complete checkpoints would occupy about 585MB memory, which is close to
> your observed scenario.
>
> From my point of view, the checkpoint interval of one second is really too
> often and would not make much sense in production environment.
>
> Best
> Yun Tang
> --
> *From:* Till Rohrmann 
> *Sent:* Thursday, April 9, 2020 17:41
> *To:* Marc LEGER 
> *Cc:* Yun Tang ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Possible memory leak in JobManager (Flink 1.10.0)?
>
> Thanks for reporting this issue Marc. From what you've reported, I think
> Yun is right and that the large memory footprint is caused by
> CompletedCheckpoints which cannot be removed fast enough. One way to verify
> this is to enable TRACE logging because then Flink will log for every
> CompletedCheckpoint when it gets discarded. The line should look like this
> "Executing discard procedure for Checkpoint". The high number of chk-X
> folders on S3 could be the result of the slow discard operations.
>
> If you want then we can also take a look at the logs and ideally also the
> heap dump if you can share them with us.
>
> I think one difference between Flink 1.10.0 and 1.7.2 is that we are using
> a fixed thread pool for running the io operations in 1.10.0. The number of
> threads equals the number of cores. In contrast, in Flink 1.7.2 we used a
> fork join pool with a max parallelism of 64. This difference could explain
> the lower throughput of discard operations because fewer can happen in
> parallel.
>
> Cheers,
> Till
>
> On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER  wrote:
>
> Hello Yun,
>
> Thank you for your feedback, please find below my answers to your
> questions:
>
> 1. I am using incremental state checkpointing with RocksDB backend and AWS
> S3 as a distributed file system, everything is configured in
> flink-conf.yaml as follows:
>
> state.backend: rocksdb
> state.backend.incremental: true
> # placeholders are replaced at deploy time
> state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
> state.backend.rocksdb.localdir: /home/data/flink/rocksdb
>
> Size of _metdata file in a checkpoint folder for the 3 running jobs:
> - job1: 64KB
> - job2: 1K
> - job3: 10K
>
> By the way, I have between 1 and 2 "chk-X" folders per job in S3.
>
> 2. Checkpointing is configured to be triggered every second for all the
> jobs. Only the interval is set, otherwise everything is kept as default:
>
> executionEnvironment.enableCheckpointing(1000);
>
> Best Regards,
> Marc
>
> Le mer. 8 avr. 2020 à 20:48, Yun Tang  a écrit :
>
> Hi Marc
>
> I think the occupied memory is due to the to-remove complete checkpoints
> which are stored in the workQueue of io-executor [1] in
> ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that
> Executors#newFixedThreadPool would create a ThreadPoolExecutor with a
> LinkedBlockingQueue to store runnables.
>
> To figure out the root cause, would you please check the information below:
>
>1. How large of your checkpoint meta, you could view
>{checkpoint-dir}/chk-X/_metadata to know the size, you could provide what
>state backend you use to help know this.
>2. What is the interval of your checkpoints, a smaller checkpoint
>interval might accumulate many completed checkpoints to subsume once a
>newer checkpoint completes.
>
>
> [1]
> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260
> [2]
> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L234
>
> Best
> Yun Tang
>
> --
> *From:* Marc LEGER 
> *Sent:* Wednesday, April 8, 2020 16:50
> *To:* user@flink.apache.org 
> *Subject:* Possible memory leak in JobManager (Flink 1.10.0)?
>
> Hello,
>
> I am currently testing Flink 1.10.0 but I am facing memory issues with
> JobManagers deployed in a standalone cluster configured in HA mode with 3
> TaskManagers (and 3 running jobs).
> I do not reproduce the same issues using Flink 1.7.2.
>
> Basically, whatever the value of "jobmanager.heap.size" property is (I
> tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process
> is eventually consuming all available memory and is hanging after a few
> hours or days (depending on the size of the heap) before being deassociated
>

Inserting nullable data into NOT NULL columns

2020-04-09 Thread Gyula Fóra
Hi All!

We ran into a problem while trying to insert data read from kafka into a
table sink where some of the columns are not nullable.

The problem is that from Kafka we can only read nullable columns in JSON
format otherwise you get the following error:

org.apache.flink.table.api.ValidationException: Type STRING NOT NULL
of table field 'first' does not match with the physical type STRING of
the 'first' field of the TableSource return type.

On the other hand no matter what we do with the nullable column (things
like select where ... is not null) the type will always STRING and will be
incompatible with the sink leading to the following error:

Query schema: [first: STRING, ...]
Sink schema: [first: STRING NOT NULL, ...]

Any idea on how to resolve this type mismatch between nullable and
non-nullable data? I feel that a query like (select x from y where x is not
null ) should change the type to not null.

Thanks
Gyula


Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread Jingsong Li
Hi lei,

Which hive version did you use?
Can you share the complete hive DDL?

Best,
Jingsong Lee

On Thu, Apr 9, 2020 at 7:15 PM wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> I am using the newest 1.10 blink planner.
>
> Perhaps it is because of the method i used to write the parquet file.
>
> Receive kafka message, transform each message to a Java class Object,
> write the Object to HDFS using StreamingFileSink, add  the HDFS path as a
> partition of the hive table
>
> No matter what the order of the field description in  hive ddl statement,
> the hive client will work, as long as  the field name is the same with Java
> Object field name.
> But flink sql client will not work.
>
> DataStream sourceRobot = source.map( x->transform(x));
> final StreamingFileSink sink;
> sink = StreamingFileSink
> .forBulkFormat(new 
> Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
> ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
>
> For example
> RobotUploadData0101 has two fields:  robotId int, robotTime long
>
> CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and
> CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
> is the same for hive client, but is different for flink-sql client
>
> It is an expected behavior?
>
> Thanks,
> Lei
>
> --
> wangl...@geekplus.com.cn
>
>
> *From:* Jark Wu 
> *Date:* 2020-04-09 14:48
> *To:* wangl...@geekplus.com.cn; Jingsong Li ;
> lirui 
> *CC:* user 
> *Subject:* Re: fink sql client not able to read parquet format table
> Hi Lei,
>
> Are you using the newest 1.10 blink planner?
>
> I'm not familiar with Hive and parquet, but I know @Jingsong Li
>  and @li...@apache.org  are
> experts on this. Maybe they can help on this question.
>
> Best,
> Jark
>
> On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn <
> wangl...@geekplus.com.cn> wrote:
>
>>
>> Hive table stored as parquet.
>>
>> Under hive client:
>> hive> select robotid from robotparquet limit 2;
>> OK
>> 1291097
>> 1291044
>>
>>
>> But under flink sql-client the result is 0
>> Flink SQL> select robotid  from robotparquet limit 2;
>>   robotid
>>  0
>>  0
>>
>> Any insight on this?
>>
>> Thanks,
>> Lei
>>
>>
>>
>> --
>> wangl...@geekplus.com.cn
>>
>>

-- 
Best, Jingsong Lee


Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Kostas Kloudas
I would say so, yes.
Also could you set the paths where you want to use Presto to "s3p", as
described in [1], just to be sure that there is not ambiguity.

You could also make use of [2].

And thanks for looking into it!

Cheers,
Kostas

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#s3-specific
[2] 
https://cwiki.apache.org/confluence/display/FLINK/Remote+Debugging+of+Flink+Clusters

On Thu, Apr 9, 2020 at 2:50 PM Roshan Punnoose  wrote:
>
> Btw, I ran the same exact code on a local Flink cluster run with 
> `./bin/start-cluster.sh` on my local machine. With `s3a` it did not work, the 
> part files do not roll over; however, with the local filesystem it works 
> perfectly. Should I be looking at the S3Committer in Flink to see if there is 
> something odd going on?
>
> On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose  wrote:
>>
>> Nope just the s3a. I'll keep looking around to see if there is anything else 
>> I can see. If you think of anything else to try, let me know.
>>
>> On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas  wrote:
>>>
>>> It should not be a problem because from what you posted, you are using
>>> "s3a" as the scheme for s3.
>>> Are you using "s3p" for Presto? This should also be done in order for
>>> Flink to understand where to use the one or the other.
>>>
>>> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose  wrote:
>>> >
>>> > Lastly, could it be the way I built the flink image for kube? I added 
>>> > both the presto and Hadoop plugins
>>> >
>>> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose  wrote:
>>> >>
>>> >> Sorry realized this came off the user list by mistake. Adding the thread 
>>> >> back in.
>>> >>
>>> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose  wrote:
>>> >>>
>>> >>> Yes sorry, no errors on the task manager. However, I am new to flink so 
>>> >>> don't know all the places to look for the logs. Been looking at the 
>>> >>> task manager logs and don't see any exceptions there. Not sure where to 
>>> >>> look for s3 exceptions in particular.
>>> >>>
>>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas  wrote:
>>> 
>>>  Yes, this is why I reached out for further information.
>>> 
>>>  Incrementing the part counter is the responsibility of the
>>>  StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>>  in the local FS.
>>>  Now if it is on the S3 side, it would help if you have any more info,
>>>  for example any logs from S3, to see if anything went wrong on their
>>>  end.
>>> 
>>>  So your logs refer to normal execution, i.e. no failures and no
>>>  restarting, right?
>>> 
>>>  Cheers,
>>>  Kostas
>>> 
>>>  On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose  
>>>  wrote:
>>>  >
>>>  > Surprisingly the same code running against the local filesystem 
>>>  > works perfectly. The part counter increments correctly.
>>>  >
>>>  > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas  
>>>  > wrote:
>>>  >>
>>>  >> Hi Roshan,
>>>  >>
>>>  >> Your logs refer to a simple run without any failures or re-running
>>>  >> from a savepoint, right?
>>>  >>
>>>  >> I am asking because I am trying to reproduce it by running a 
>>>  >> modified
>>>  >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>>  >> The ITCase runs against the local filesystem, and not S3, but I 
>>>  >> added
>>>  >> the OutputFileConfig and it seems that the part counter is increases
>>>  >> as expected.
>>>  >>
>>>  >> Is there any other information that would help us reproduce the 
>>>  >> issue?
>>>  >>
>>>  >> Cheers,
>>>  >> Kostas
>>>  >>
>>>  >> [1] 
>>>  >> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>>  >>
>>>  >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose  
>>>  >> wrote:
>>>  >> >
>>>  >> > Hi,
>>>  >> >
>>>  >> > I am trying to get the parquet writer to write to s3; however, 
>>>  >> > the files do not seem to be rolling over. The same file 
>>>  >> > "part-0-0.parquet" is being created each time. Like the 
>>>  >> > 'partCounter" is not being updated? Maybe the Bucket is being 
>>>  >> > recreated each time? I don't really know... Here are some logs:
>>>  >> >
>>>  >> > 2020-04-09 01:28:10,350 INFO 
>>>  >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets 
>>>  >> > - Subtask 0 checkpointing for checkpoint with id=2 (max part 
>>>  >> > counter=2).
>>>  >> > 2020-04-09 01:28:10,589 INFO 
>>>  >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets 
>>>  >> > - Subtask 0 received completion notification for checkpoint with 
>>>  >> > id=2.
>>>  >> > 2020-04-09 01:28:10,589 INFO 
>>>  >> > org.apache.flink.fs.s3.common.writer.S3Comm

Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

2020-04-09 Thread Zhijiang
Great work! Thanks Gordon for the continuous efforts for enhancing stateful 
functions and the efficient release!
Wish stateful functions becoming more and more popular in users.

Best,
Zhijiang


--
From:Yun Tang 
Send Time:2020 Apr. 9 (Thu.) 00:17
To:Till Rohrmann ; dev 
Cc:Oytun Tez ; user 
Subject:Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

Excited to see the stateful functions release!
Thanks for the great work of manager Gordon and everyone who ever contributed 
to this.

Best
Yun Tang

From: Till Rohrmann 
Sent: Wednesday, April 8, 2020 14:30
To: dev 
Cc: Oytun Tez ; user 
Subject: Re: [ANNOUNCE] Apache Flink Stateful Functions 2.0.0 released

Great news! Thanks a lot for being our release manager Gordon and to everyone 
who helped with the release.

Cheers,
Till

On Wed, Apr 8, 2020 at 3:57 AM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:
Thanks a lot for the release and your great job, Gordon!
Also thanks to everyone who made this release possible!

Best,
Congxian


Oytun Tez mailto:oy...@motaword.com>> 于2020年4月8日周三 上午2:55写道:

> I should also add, I couldn't agree more with this sentence in the release
> article: "state access/updates and messaging need to be integrated."
>
> This is something we strictly enforce in our Flink case, where we do not
> refer to anything external for storage, use Flink as our DB.
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>
>
> On Tue, Apr 7, 2020 at 12:26 PM Oytun Tez 
> mailto:oy...@motaword.com>> wrote:
>
>> Great news! Thank you all.
>>
>> On Tue, Apr 7, 2020 at 12:23 PM Marta Paes Moreira 
>> mailto:ma...@ververica.com>>
>> wrote:
>>
>>> Thank you for managing the release, Gordon — you did a tremendous job!
>>> And to everyone else who worked on pushing it through.
>>>
>>> Really excited about the new use cases that StateFun 2.0 unlocks for
>>> Flink users and beyond!
>>>
>>>
>>> Marta
>>>
>>> On Tue, Apr 7, 2020 at 4:47 PM Hequn Cheng 
>>> mailto:he...@apache.org>> wrote:
>>>
 Thanks a lot for the release and your great job, Gordon!
 Also thanks to everyone who made this release possible!

 Best,
 Hequn

 On Tue, Apr 7, 2020 at 8:58 PM Tzu-Li (Gordon) Tai 
 mailto:tzuli...@apache.org>>
 wrote:

> The Apache Flink community is very happy to announce the release of
> Apache Flink Stateful Functions 2.0.0.
>
> Stateful Functions is an API that simplifies building distributed
> stateful applications.
> It's based on functions with persistent state that can interact
> dynamically with strong consistency guarantees.
>
> Please check out the release blog post for an overview of the release:
> https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Maven artifacts for Stateful Functions can be found at:
> https://search.maven.org/search?q=g:org.apache.flink%20statefun
>
> Python SDK for Stateful Functions published to the PyPI index can be
> found at:
> https://pypi.org/project/apache-flink-statefun/
>
> Official Docker image for building Stateful Functions applications is
> currently being published to Docker Hub.
> Dockerfiles for this release can be found at:
> https://github.com/apache/flink-statefun-docker/tree/master/2.0.0
> Progress for creating the Docker Hub repository can be tracked at:
> https://github.com/docker-library/official-images/pull/7749
>
> The full release notes are available in Jira:
>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12346878
>
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>
> Cheers,
> Gordon
>
 --
>>  --
>>
>> [image: MotaWord]
>> Oytun Tez
>> M O T A W O R D | CTO & Co-Founder
>> oy...@motaword.com
>>
>>   
>>
>



Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Roshan Punnoose
Btw, I ran the same exact code on a local Flink cluster run with
`./bin/start-cluster.sh` on my local machine. With `s3a` it did not work,
the part files do not roll over; however, with the local filesystem it
works perfectly. Should I be looking at the S3Committer in Flink to see if
there is something odd going on?

On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose  wrote:

> Nope just the s3a. I'll keep looking around to see if there is anything
> else I can see. If you think of anything else to try, let me know.
>
> On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas  wrote:
>
>> It should not be a problem because from what you posted, you are using
>> "s3a" as the scheme for s3.
>> Are you using "s3p" for Presto? This should also be done in order for
>> Flink to understand where to use the one or the other.
>>
>> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose  wrote:
>> >
>> > Lastly, could it be the way I built the flink image for kube? I added
>> both the presto and Hadoop plugins
>> >
>> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose  wrote:
>> >>
>> >> Sorry realized this came off the user list by mistake. Adding the
>> thread back in.
>> >>
>> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose 
>> wrote:
>> >>>
>> >>> Yes sorry, no errors on the task manager. However, I am new to flink
>> so don't know all the places to look for the logs. Been looking at the task
>> manager logs and don't see any exceptions there. Not sure where to look for
>> s3 exceptions in particular.
>> >>>
>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas 
>> wrote:
>> 
>>  Yes, this is why I reached out for further information.
>> 
>>  Incrementing the part counter is the responsibility of the
>>  StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>  in the local FS.
>>  Now if it is on the S3 side, it would help if you have any more info,
>>  for example any logs from S3, to see if anything went wrong on their
>>  end.
>> 
>>  So your logs refer to normal execution, i.e. no failures and no
>>  restarting, right?
>> 
>>  Cheers,
>>  Kostas
>> 
>>  On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose 
>> wrote:
>>  >
>>  > Surprisingly the same code running against the local filesystem
>> works perfectly. The part counter increments correctly.
>>  >
>>  > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas 
>> wrote:
>>  >>
>>  >> Hi Roshan,
>>  >>
>>  >> Your logs refer to a simple run without any failures or re-running
>>  >> from a savepoint, right?
>>  >>
>>  >> I am asking because I am trying to reproduce it by running a
>> modified
>>  >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>  >> The ITCase runs against the local filesystem, and not S3, but I
>> added
>>  >> the OutputFileConfig and it seems that the part counter is
>> increases
>>  >> as expected.
>>  >>
>>  >> Is there any other information that would help us reproduce the
>> issue?
>>  >>
>>  >> Cheers,
>>  >> Kostas
>>  >>
>>  >> [1]
>> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>  >>
>>  >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose 
>> wrote:
>>  >> >
>>  >> > Hi,
>>  >> >
>>  >> > I am trying to get the parquet writer to write to s3; however,
>> the files do not seem to be rolling over. The same file "part-0-0.parquet"
>> is being created each time. Like the 'partCounter" is not being updated?
>> Maybe the Bucket is being recreated each time? I don't really know... Here
>> are some logs:
>>  >> >
>>  >> > 2020-04-09 01:28:10,350 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>  >> > 2020-04-09 01:28:10,589 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 received completion notification for checkpoint with id=2.
>>  >> > 2020-04-09 01:28:10,589 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>  >> > 2020-04-09 01:29:10,350 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 checkpointing for checkpoint with id=3 (max part counter=3).
>>  >> > 2020-04-09 01:29:10,520 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 received completion notification for checkpoint with id=3.
>>  >> > 2020-04-09 01:29:10,521 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6

Re: Making job fail on Checkpoint Expired?

2020-04-09 Thread Robin Cassan
Hello again Congxian,

Thank you so much for your advice, it is really helpful! We have managed to
pinpoint that most of our problems occur because of disk pressure, most
likely due to the usage of EBS, we will try again with local SSDs.
Digging deeper into the "snowball effect on incremental checkpoint
timeouts", I am wondering two things:

- Would it help to use Concurrent Checkpointing? In our current tests,
Flink waits for the previous checkpoint to finish before starting the next
one. So if the previous one has expired, the next one will be twice as big.
But if we enable concurrent checkpoints, is it correct to assume that the
second checkpoint that the checkpoints sizes should be more consistent?
More precisely, if a second checkpoint triggers during the first
checkpoint, this will fix the size of the first checkpoint because new
barriers are injected, and if the first checkpoint expires it would be
retried with the same amount of data?

- I am also wondering if there is a way for long checkpoints to create
backpressure on the rest of the stream? This would be a nice feature to
have, since it would avoid the state growing too much when checkpointing
takes time because of temporary network issues for example.

Thanks for your help!
Robin

Le mer. 8 avr. 2020 à 05:30, Congxian Qiu  a écrit :

> Hi Robin
> Thanks for the detailed reply, and sorry for my late reply.
> I think that your request to fail the whole job when continues checkpoint
> expired is valid, I've created an issue to track this[1]
>
> For now, maybe the following steps can help you find out the reason of
> time out
>
> 1. You can find out the "not ack subtask" in checkpoint ui, (maybe it
> called A)
> 2. find out A is under backpressure now?
> 2.1. if A is under backpressure, please fix it
> 2.2 if A is not under backpressure, you can go to the tm log of A to find
> out something abnormal(maybe you need to enable the debug log in this step)
>
> for the snapshot in TM side, it contains 1) barrier align (exactly-once
> mode, at least once no need to align the barrier); 2) synchronize
> procedure; 3)asynchronize procedure;
>
> backpressure will affect step 1, too many timers/cpu consumption too
> high/disk utilization too high may affect step 2; 3) disk
> performance/network performance may affect step 3;
>
> [1] https://issues.apache.org/jira/browse/FLINK-17043
> Best,
> Congxian
>
>
> Robin Cassan  于2020年4月3日周五 下午8:35写道:
>
>> Hi Congxian,
>>
>> Thanks for confirming! The reason I want this behavior is because we are
>> currently investigating issues with checkpoints that keep getting timeouts
>> after the job has been running for a few hours. We observed that, after a
>> few timeouts, if the job was being restarted because of a lost TM for
>> example, the next checkpoints would be working for a few more hours.
>> However, if the job continues running and consuming more data, the next
>> checkpoints will be even bigger and the chances of them completing in time
>> are getting even thinner.
>> Crashing the job is not a viable solution I agree, but it would allow us
>> to generate data during the time we investigate the root cause of the
>> timeouts.
>>
>> I believe that having the option to make the job restart after a few
>> checkpoint timeouts would still help to avoid the snowball effect of
>> incremental checkpoints being bigger and bigger if the checkpoints keep
>> getting expired.
>>
>> I'd love to get your opinion on this!
>>
>> Thanks,
>> Robin
>>
>> Le ven. 3 avr. 2020 à 11:17, Congxian Qiu  a
>> écrit :
>>
>>> Currently, only checkpoint declined will be counted into
>>> `continuousFailureCounter`.
>>> Could you please share why do you want the job to fail when checkpoint
>>> expired?
>>>
>>> Best,
>>> Congxian
>>>
>>>
>>> Timo Walther  于2020年4月2日周四 下午11:23写道:
>>>
 Hi Robin,

 this is a very good observation and maybe even unintended behavior.
 Maybe Arvid in CC is more familiar with the checkpointing?

 Regards,
 Timo


 On 02.04.20 15:37, Robin Cassan wrote:
 > Hi all,
 >
 > I am wondering if there is a way to make a flink job fail (not cancel
 > it) when one or several checkpoints have failed due to being expired
 > (taking longer than the timeout) ?
 > I am using Flink 1.9.2 and have set
 > `*setTolerableCheckpointFailureNumber(1)*` which doesn't do the
 trick.
 > Looking into the CheckpointFailureManager.java class, it looks like
 this
 > only works when the checkpoint failure reason is
 > `*CHECKPOINT_DECLINED*`, but the number of failures isn't incremented
 on
 > `*CHECKPOINT_EXPIRED*`.
 > Am I missing something?
 >
 > Thanks!




Re: Upgrade of Cassandra driver in connector

2020-04-09 Thread Ismaël Mejía
Just for info the Cassandra client version 4 driver does not leak
guava anymore, but represents a considerable API change so maybe worth
to explore that as an alternative to 'modernize' the connector.

On Thu, Apr 9, 2020 at 1:54 PM Aljoscha Krettek  wrote:
>
> Hi Thomas!
>
> On 09.04.20 11:35, Thms Hmm wrote:
> > Hey,
> > are there any plans to upgrade the version of the Cassandra driver to a
> > newer one?
>
> There is this Jira issue, along with a PR:
> https://issues.apache.org/jira/browse/FLINK-8424. But no-one has worked
> on it for a while. There were some difficulties in getting it to work
> even with that "minor" update. I'd be very happy if someone with more
> Cassandra experience would pick up this issue.
>
> > Also the shading makes it not possible to easily exchange the driver. Is
> > that correct?
>
> Yes, the shading makes it impossible to exchange the driver, I'd say.
>
> Best,
> Aljoscha


Re: Flink job didn't restart when a task failed

2020-04-09 Thread Aljoscha Krettek

Hi,

this indeed seems very strange!

@Gary Could you maybe have a look at this since you work/worked quite a 
bit on the scheduler?


Best,
Aljoscha

On 09.04.20 05:46, Hanson, Bruce wrote:

Hello Flink folks:

We had a problem with a Flink job the other day that I haven’t seen before. One 
task encountered a failure and switched to FAILED (see the full exception 
below). After the failure, the task said it was notifying the Job Manager:

2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283] level=INFO 
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering task and 
sending final execution state FAILED to JobManager for task 
FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.

But I see no evidence that the Job Manager got the message. I would expect with 
this type of failure that the Job Manager would restart the job. In this case, 
the job carried on, hobbled, until the it stopped processing data and our user 
had to manually restart the job. The job also started experiencing checkpoint 
timeouts on every checkpoint due to this operator stopping.

Had the job restarted when this happened, I believe everything would have been 
ok as the job had an appropriate restart strategy in place. The Task Manager 
that this task was running on remained healthy and was actively processing 
other tasks.

It seems like this is some kind of a bug. Is this something anyone has seen 
before? Could it be something that has been fixed if we went to Flink 1.10?

We are running Flink 1.7.2. I know it’s rather old now. We run a managed 
environment where users can run their jobs, and are in the process of upgrading 
to 1.10.

This is the full exception that started the problem:

2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO  
org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION (15/20) 
(3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: 
Connection timed out (connection to '/100.112.98.121:36256')
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:87)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1401)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:953)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:

Re: Upgrade of Cassandra driver in connector

2020-04-09 Thread Aljoscha Krettek

Hi Thomas!

On 09.04.20 11:35, Thms Hmm wrote:

Hey,
are there any plans to upgrade the version of the Cassandra driver to a
newer one?


There is this Jira issue, along with a PR: 
https://issues.apache.org/jira/browse/FLINK-8424. But no-one has worked 
on it for a while. There were some difficulties in getting it to work 
even with that "minor" update. I'd be very happy if someone with more 
Cassandra experience would pick up this issue.



Also the shading makes it not possible to easily exchange the driver. Is
that correct?


Yes, the shading makes it impossible to exchange the driver, I'd say.

Best,
Aljoscha


Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Roshan Punnoose
Nope just the s3a. I'll keep looking around to see if there is anything
else I can see. If you think of anything else to try, let me know.

On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas  wrote:

> It should not be a problem because from what you posted, you are using
> "s3a" as the scheme for s3.
> Are you using "s3p" for Presto? This should also be done in order for
> Flink to understand where to use the one or the other.
>
> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose  wrote:
> >
> > Lastly, could it be the way I built the flink image for kube? I added
> both the presto and Hadoop plugins
> >
> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose  wrote:
> >>
> >> Sorry realized this came off the user list by mistake. Adding the
> thread back in.
> >>
> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose  wrote:
> >>>
> >>> Yes sorry, no errors on the task manager. However, I am new to flink
> so don't know all the places to look for the logs. Been looking at the task
> manager logs and don't see any exceptions there. Not sure where to look for
> s3 exceptions in particular.
> >>>
> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas 
> wrote:
> 
>  Yes, this is why I reached out for further information.
> 
>  Incrementing the part counter is the responsibility of the
>  StreamingFileSink, whose code is FS-agnostic, so it should also fail
>  in the local FS.
>  Now if it is on the S3 side, it would help if you have any more info,
>  for example any logs from S3, to see if anything went wrong on their
>  end.
> 
>  So your logs refer to normal execution, i.e. no failures and no
>  restarting, right?
> 
>  Cheers,
>  Kostas
> 
>  On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose 
> wrote:
>  >
>  > Surprisingly the same code running against the local filesystem
> works perfectly. The part counter increments correctly.
>  >
>  > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas 
> wrote:
>  >>
>  >> Hi Roshan,
>  >>
>  >> Your logs refer to a simple run without any failures or re-running
>  >> from a savepoint, right?
>  >>
>  >> I am asking because I am trying to reproduce it by running a
> modified
>  >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>  >> The ITCase runs against the local filesystem, and not S3, but I
> added
>  >> the OutputFileConfig and it seems that the part counter is
> increases
>  >> as expected.
>  >>
>  >> Is there any other information that would help us reproduce the
> issue?
>  >>
>  >> Cheers,
>  >> Kostas
>  >>
>  >> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>  >>
>  >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose 
> wrote:
>  >> >
>  >> > Hi,
>  >> >
>  >> > I am trying to get the parquet writer to write to s3; however,
> the files do not seem to be rolling over. The same file "part-0-0.parquet"
> is being created each time. Like the 'partCounter" is not being updated?
> Maybe the Bucket is being recreated each time? I don't really know... Here
> are some logs:
>  >> >
>  >> > 2020-04-09 01:28:10,350 INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
> 0 checkpointing for checkpoint with id=2 (max part counter=2).
>  >> > 2020-04-09 01:28:10,589 INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
> 0 received completion notification for checkpoint with id=2.
>  >> > 2020-04-09 01:28:10,589 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>  >> > 2020-04-09 01:29:10,350 INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
> 0 checkpointing for checkpoint with id=3 (max part counter=3).
>  >> > 2020-04-09 01:29:10,520 INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
> 0 received completion notification for checkpoint with id=3.
>  >> > 2020-04-09 01:29:10,521 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>  >> > And a part of my code:
>  >> >
>  >> > ```
>  >> >
>  >> > StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>  >> >
>  >> > //env.setParallelism(2);
>  >> > env.enableCheckpointing(6L);
>  >> > ///PROPERTIES Added
>  >> > Schema schema = bro_conn.getClassSchema();
>  >> >
>  >> > OutputFileConfig confi

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Kostas Kloudas
It should not be a problem because from what you posted, you are using
"s3a" as the scheme for s3.
Are you using "s3p" for Presto? This should also be done in order for
Flink to understand where to use the one or the other.

On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose  wrote:
>
> Lastly, could it be the way I built the flink image for kube? I added both 
> the presto and Hadoop plugins
>
> On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose  wrote:
>>
>> Sorry realized this came off the user list by mistake. Adding the thread 
>> back in.
>>
>> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose  wrote:
>>>
>>> Yes sorry, no errors on the task manager. However, I am new to flink so 
>>> don't know all the places to look for the logs. Been looking at the task 
>>> manager logs and don't see any exceptions there. Not sure where to look for 
>>> s3 exceptions in particular.
>>>
>>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas  wrote:

 Yes, this is why I reached out for further information.

 Incrementing the part counter is the responsibility of the
 StreamingFileSink, whose code is FS-agnostic, so it should also fail
 in the local FS.
 Now if it is on the S3 side, it would help if you have any more info,
 for example any logs from S3, to see if anything went wrong on their
 end.

 So your logs refer to normal execution, i.e. no failures and no
 restarting, right?

 Cheers,
 Kostas

 On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose  wrote:
 >
 > Surprisingly the same code running against the local filesystem works 
 > perfectly. The part counter increments correctly.
 >
 > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas  wrote:
 >>
 >> Hi Roshan,
 >>
 >> Your logs refer to a simple run without any failures or re-running
 >> from a savepoint, right?
 >>
 >> I am asking because I am trying to reproduce it by running a modified
 >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
 >> The ITCase runs against the local filesystem, and not S3, but I added
 >> the OutputFileConfig and it seems that the part counter is increases
 >> as expected.
 >>
 >> Is there any other information that would help us reproduce the issue?
 >>
 >> Cheers,
 >> Kostas
 >>
 >> [1] 
 >> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
 >>
 >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose  
 >> wrote:
 >> >
 >> > Hi,
 >> >
 >> > I am trying to get the parquet writer to write to s3; however, the 
 >> > files do not seem to be rolling over. The same file 
 >> > "part-0-0.parquet" is being created each time. Like the 'partCounter" 
 >> > is not being updated? Maybe the Bucket is being recreated each time? 
 >> > I don't really know... Here are some logs:
 >> >
 >> > 2020-04-09 01:28:10,350 INFO 
 >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
 >> > Subtask 0 checkpointing for checkpoint with id=2 (max part counter=2).
 >> > 2020-04-09 01:28:10,589 INFO 
 >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
 >> > Subtask 0 received completion notification for checkpoint with id=2.
 >> > 2020-04-09 01:28:10,589 INFO 
 >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing 
 >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
 >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
 >> > 2020-04-09 01:29:10,350 INFO 
 >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
 >> > Subtask 0 checkpointing for checkpoint with id=3 (max part counter=3).
 >> > 2020-04-09 01:29:10,520 INFO 
 >> > org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - 
 >> > Subtask 0 received completion notification for checkpoint with id=3.
 >> > 2020-04-09 01:29:10,521 INFO 
 >> > org.apache.flink.fs.s3.common.writer.S3Committer - Committing 
 >> > bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
 >> > Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
 >> > And a part of my code:
 >> >
 >> > ```
 >> >
 >> > StreamExecutionEnvironment env = 
 >> > StreamExecutionEnvironment.getExecutionEnvironment();
 >> >
 >> > //env.setParallelism(2);
 >> > env.enableCheckpointing(6L);
 >> > ///PROPERTIES Added
 >> > Schema schema = bro_conn.getClassSchema();
 >> >
 >> > OutputFileConfig config = OutputFileConfig
 >> > .builder()
 >> > .withPartSuffix(".parquet")
 >> > .build();
 >> >
 >> >

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Roshan Punnoose
Lastly, could it be the way I built the flink image for kube? I added both
the presto and Hadoop plugins

On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose  wrote:

> Sorry realized this came off the user list by mistake. Adding the thread
> back in.
>
> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose  wrote:
>
>> Yes sorry, no errors on the task manager. However, I am new to flink so
>> don't know all the places to look for the logs. Been looking at the task
>> manager logs and don't see any exceptions there. Not sure where to look for
>> s3 exceptions in particular.
>>
>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas  wrote:
>>
>>> Yes, this is why I reached out for further information.
>>>
>>> Incrementing the part counter is the responsibility of the
>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>>> in the local FS.
>>> Now if it is on the S3 side, it would help if you have any more info,
>>> for example any logs from S3, to see if anything went wrong on their
>>> end.
>>>
>>> So your logs refer to normal execution, i.e. no failures and no
>>> restarting, right?
>>>
>>> Cheers,
>>> Kostas
>>>
>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose 
>>> wrote:
>>> >
>>> > Surprisingly the same code running against the local filesystem works
>>> perfectly. The part counter increments correctly.
>>> >
>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas 
>>> wrote:
>>> >>
>>> >> Hi Roshan,
>>> >>
>>> >> Your logs refer to a simple run without any failures or re-running
>>> >> from a savepoint, right?
>>> >>
>>> >> I am asking because I am trying to reproduce it by running a modified
>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>>> >> The ITCase runs against the local filesystem, and not S3, but I added
>>> >> the OutputFileConfig and it seems that the part counter is increases
>>> >> as expected.
>>> >>
>>> >> Is there any other information that would help us reproduce the issue?
>>> >>
>>> >> Cheers,
>>> >> Kostas
>>> >>
>>> >> [1]
>>> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>>> >>
>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose 
>>> wrote:
>>> >> >
>>> >> > Hi,
>>> >> >
>>> >> > I am trying to get the parquet writer to write to s3; however, the
>>> files do not seem to be rolling over. The same file "part-0-0.parquet" is
>>> being created each time. Like the 'partCounter" is not being updated? Maybe
>>> the Bucket is being recreated each time? I don't really know... Here are
>>> some logs:
>>> >> >
>>> >> > 2020-04-09 01:28:10,350 INFO
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>>> 0 checkpointing for checkpoint with id=2 (max part counter=2).
>>> >> > 2020-04-09 01:28:10,589 INFO
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>>> 0 received completion notification for checkpoint with id=2.
>>> >> > 2020-04-09 01:28:10,589 INFO
>>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>> >> > 2020-04-09 01:29:10,350 INFO
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>>> 0 checkpointing for checkpoint with id=3 (max part counter=3).
>>> >> > 2020-04-09 01:29:10,520 INFO
>>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>>> 0 received completion notification for checkpoint with id=3.
>>> >> > 2020-04-09 01:29:10,521 INFO
>>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>>> >> > And a part of my code:
>>> >> >
>>> >> > ```
>>> >> >
>>> >> > StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> >> >
>>> >> > //env.setParallelism(2);
>>> >> > env.enableCheckpointing(6L);
>>> >> > ///PROPERTIES Added
>>> >> > Schema schema = bro_conn.getClassSchema();
>>> >> >
>>> >> > OutputFileConfig config = OutputFileConfig
>>> >> > .builder()
>>> >> > .withPartSuffix(".parquet")
>>> >> > .build();
>>> >> >
>>> >> > final StreamingFileSink sink =
>>> StreamingFileSink
>>> >> > .forBulkFormat(new
>>> Path("s3a:///bro_conn/"),
>>> ParquetAvroWriters.forGenericRecord(schema))
>>> >> > //
>>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>>> >> > .withOutputFileConfig(config)
>>> >> > //.withBucketAssigner(new
>>> PartitioningBucketAssigner())
>>> >> > .build();
>>> >> >
>>> >> > DataStream kinesis = env.addSource(new
>>> FlinkKinesisConsum

Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Roshan Punnoose
Sorry realized this came off the user list by mistake. Adding the thread
back in.

On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose  wrote:

> Yes sorry, no errors on the task manager. However, I am new to flink so
> don't know all the places to look for the logs. Been looking at the task
> manager logs and don't see any exceptions there. Not sure where to look for
> s3 exceptions in particular.
>
> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas  wrote:
>
>> Yes, this is why I reached out for further information.
>>
>> Incrementing the part counter is the responsibility of the
>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
>> in the local FS.
>> Now if it is on the S3 side, it would help if you have any more info,
>> for example any logs from S3, to see if anything went wrong on their
>> end.
>>
>> So your logs refer to normal execution, i.e. no failures and no
>> restarting, right?
>>
>> Cheers,
>> Kostas
>>
>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose 
>> wrote:
>> >
>> > Surprisingly the same code running against the local filesystem works
>> perfectly. The part counter increments correctly.
>> >
>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas  wrote:
>> >>
>> >> Hi Roshan,
>> >>
>> >> Your logs refer to a simple run without any failures or re-running
>> >> from a savepoint, right?
>> >>
>> >> I am asking because I am trying to reproduce it by running a modified
>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>> >> The ITCase runs against the local filesystem, and not S3, but I added
>> >> the OutputFileConfig and it seems that the part counter is increases
>> >> as expected.
>> >>
>> >> Is there any other information that would help us reproduce the issue?
>> >>
>> >> Cheers,
>> >> Kostas
>> >>
>> >> [1]
>> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>> >>
>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose 
>> wrote:
>> >> >
>> >> > Hi,
>> >> >
>> >> > I am trying to get the parquet writer to write to s3; however, the
>> files do not seem to be rolling over. The same file "part-0-0.parquet" is
>> being created each time. Like the 'partCounter" is not being updated? Maybe
>> the Bucket is being recreated each time? I don't really know... Here are
>> some logs:
>> >> >
>> >> > 2020-04-09 01:28:10,350 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 checkpointing for checkpoint with id=2 (max part counter=2).
>> >> > 2020-04-09 01:28:10,589 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 received completion notification for checkpoint with id=2.
>> >> > 2020-04-09 01:28:10,589 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> >> > 2020-04-09 01:29:10,350 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 checkpointing for checkpoint with id=3 (max part counter=3).
>> >> > 2020-04-09 01:29:10,520 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 received completion notification for checkpoint with id=3.
>> >> > 2020-04-09 01:29:10,521 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> >> > And a part of my code:
>> >> >
>> >> > ```
>> >> >
>> >> > StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >> >
>> >> > //env.setParallelism(2);
>> >> > env.enableCheckpointing(6L);
>> >> > ///PROPERTIES Added
>> >> > Schema schema = bro_conn.getClassSchema();
>> >> >
>> >> > OutputFileConfig config = OutputFileConfig
>> >> > .builder()
>> >> > .withPartSuffix(".parquet")
>> >> > .build();
>> >> >
>> >> > final StreamingFileSink sink =
>> StreamingFileSink
>> >> > .forBulkFormat(new Path("s3a:///bro_conn/"),
>> ParquetAvroWriters.forGenericRecord(schema))
>> >> > //
>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>> >> > .withOutputFileConfig(config)
>> >> > //.withBucketAssigner(new
>> PartitioningBucketAssigner())
>> >> > .build();
>> >> >
>> >> > DataStream kinesis = env.addSource(new
>> FlinkKinesisConsumer<>(
>> >> > "kinesis", new SimpleStringSchema(),
>> consumerConfig));
>> >> >
>> >> > kinesis.flatMap(new JsonAvroParser())
>> >> > .addSink(sink);
>> >> >
>> >> >
>> >> > env.execute("Bro Conn");
>> >> >
>> >> > ```
>> >> >
>> >> > I'm

Re: Possible memory leak in JobManager (Flink 1.10.0)?

2020-04-09 Thread Yun Tang
Hi Marc

The left 'chk-X' folders, which should be discarded when removing checkpoint at 
the final stage, could also prove that those not discarded completed checkpoint 
meta occupied the memory.

If we treat your average checkpoint meta size as 30KB, 2 not-discarded 
complete checkpoints would occupy about 585MB memory, which is close to your 
observed scenario.

>From my point of view, the checkpoint interval of one second is really too 
>often and would not make much sense in production environment.

Best
Yun Tang

From: Till Rohrmann 
Sent: Thursday, April 9, 2020 17:41
To: Marc LEGER 
Cc: Yun Tang ; user@flink.apache.org 
Subject: Re: Possible memory leak in JobManager (Flink 1.10.0)?

Thanks for reporting this issue Marc. From what you've reported, I think Yun is 
right and that the large memory footprint is caused by CompletedCheckpoints 
which cannot be removed fast enough. One way to verify this is to enable TRACE 
logging because then Flink will log for every CompletedCheckpoint when it gets 
discarded. The line should look like this "Executing discard procedure for 
Checkpoint". The high number of chk-X folders on S3 could be the result of the 
slow discard operations.

If you want then we can also take a look at the logs and ideally also the heap 
dump if you can share them with us.

I think one difference between Flink 1.10.0 and 1.7.2 is that we are using a 
fixed thread pool for running the io operations in 1.10.0. The number of 
threads equals the number of cores. In contrast, in Flink 1.7.2 we used a fork 
join pool with a max parallelism of 64. This difference could explain the lower 
throughput of discard operations because fewer can happen in parallel.

Cheers,
Till

On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER 
mailto:maleger...@gmail.com>> wrote:
Hello Yun,

Thank you for your feedback, please find below my answers to your questions:

1. I am using incremental state checkpointing with RocksDB backend and AWS S3 
as a distributed file system, everything is configured in flink-conf.yaml as 
follows:

state.backend: rocksdb
state.backend.incremental: true
# placeholders are replaced at deploy time
state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
state.backend.rocksdb.localdir: /home/data/flink/rocksdb

Size of _metdata file in a checkpoint folder for the 3 running jobs:
- job1: 64KB
- job2: 1K
- job3: 10K

By the way, I have between 1 and 2 "chk-X" folders per job in S3.

2. Checkpointing is configured to be triggered every second for all the jobs. 
Only the interval is set, otherwise everything is kept as default:

executionEnvironment.enableCheckpointing(1000);

Best Regards,
Marc

Le mer. 8 avr. 2020 à 20:48, Yun Tang 
mailto:myas...@live.com>> a écrit :
Hi Marc

I think the occupied memory is due to the to-remove complete checkpoints which 
are stored in the workQueue of io-executor [1] in 
ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that 
Executors#newFixedThreadPool would create a ThreadPoolExecutor with a 
LinkedBlockingQueue to store runnables.

To figure out the root cause, would you please check the information below:

  1.  How large of your checkpoint meta, you could view 
{checkpoint-dir}/chk-X/_metadata to know the size, you could provide what state 
backend you use to help know this.
  2.  What is the interval of your checkpoints, a smaller checkpoint interval 
might accumulate many completed checkpoints to subsume once a newer checkpoint 
completes.

[1] 
https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260
[2] 
https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L234

Best
Yun Tang


From: Marc LEGER mailto:maleger...@gmail.com>>
Sent: Wednesday, April 8, 2020 16:50
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Possible memory leak in JobManager (Flink 1.10.0)?

Hello,

I am currently testing Flink 1.10.0 but I am facing memory issues with 
JobManagers deployed in a standalone cluster configured in HA mode with 3 
TaskManagers (and 3 running jobs).
I do not reproduce the same issues using Flink 1.7.2.

Basically, whatever the value of "jobmanager.heap.size" property is (I tried 
with 2 GB, then 4GB and finally 8GB), the leader JobManager process is 
eventually consuming all available memory and is hanging after a few hours or 
days (depending on the size of the heap) before being deassociated from the 
cluster.

I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
openjdk version "11.0.6" 2020-01-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux amd64-64-Bit 
Compressed

I performe

Re: Re: fink sql client not able to read parquet format table

2020-04-09 Thread wangl...@geekplus.com.cn

I am using the newest 1.10 blink planner.

Perhaps it is because of the method i used to write the parquet file.

Receive kafka message, transform each message to a Java class Object, write the 
Object to HDFS using StreamingFileSink, add  the HDFS path as a partition of 
the hive table

No matter what the order of the field description in  hive ddl statement, the 
hive client will work, as long as  the field name is the same with Java Object 
field name.
But flink sql client will not work.

DataStream sourceRobot = source.map( x->transform(x));
final StreamingFileSink sink;
sink = StreamingFileSink
.forBulkFormat(new 
Path("hdfs://172.19.78.38:8020/user/root/wanglei/robotdata/parquet"),
ParquetAvroWriters.forReflectRecord(RobotUploadData0101.class))
For example 
RobotUploadData0101 has two fields:  robotId int, robotTime long

CREATE TABLE `robotparquet`(  `robotid` int,  `robottime` bigint ) and 
CREATE TABLE `robotparquet`(  `robottime` bigint,   `robotid` int)
is the same for hive client, but is different for flink-sql client

It is an expected behavior?

Thanks,
Lei



wangl...@geekplus.com.cn 

 
From: Jark Wu
Date: 2020-04-09 14:48
To: wangl...@geekplus.com.cn; Jingsong Li; lirui
CC: user
Subject: Re: fink sql client not able to read parquet format table
Hi Lei,

Are you using the newest 1.10 blink planner? 

I'm not familiar with Hive and parquet, but I know @Jingsong Li and 
@li...@apache.org are experts on this. Maybe they can help on this question. 

Best,
Jark

On Tue, 7 Apr 2020 at 16:17, wangl...@geekplus.com.cn 
 wrote:

Hive table stored as parquet.

Under hive client: 
hive> select robotid from robotparquet limit 2;
OK
1291097
1291044


But under flink sql-client the result is 0
Flink SQL> select robotid  from robotparquet limit 2;
  robotid
 0
 0

Any insight on this?

Thanks,
Lei





wangl...@geekplus.com.cn 



Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Arvid Heise
I was wondering if you could actually really use AsyncWaitOperator in the
following way.

- Use a rather big timeout (so if callbacks usually take 1s, use 10).
- Use UNORDERED mode.
- Use a rather big queue size that would not cause any backpressure (you
could just experiment with different settings).

Then, you'd probably get to the operator that you would need to implement
manually anyways.
- Requests come in a specific order, that order is retained when calling
the external library.
- Results are immediately returned (depending on your watermark settings)
resulting in no additional latency (because of UNORDERED).
- The big timeouts guarantee that you will not dismiss a certain input too
quickly, if the callback takes longer than usual. It will clean up all
elements from state that have no callbacks after the given time though.
- The big queue size will avoid backpressure resulting from many pending
requests without response. Let's say you have 100 requests per second and a
timeout of 10s, that means a queue size of 1000 would allow all incoming
requests to be processed almost instantly (ignored the actual callbacks
that decrease the needed queue size as you said it to be a rather rare
event)

On Thu, Apr 9, 2020 at 11:09 AM Salva Alcántara 
wrote:

> Perfectly understood, thanks a lot for your reply/patience . I will take a
> look at AsyncWaitOperator and adapt from there if I really need that.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Parquet S3 Sink Part files are not rolling over with checkpoint

2020-04-09 Thread Kostas Kloudas
Hi Roshan,

Your logs refer to a simple run without any failures or re-running
from a savepoint, right?

I am asking because I am trying to reproduce it by running a modified
ParquetStreamingFileSinkITCase [1] and so far I cannot.
The ITCase runs against the local filesystem, and not S3, but I added
the OutputFileConfig and it seems that the part counter is increases
as expected.

Is there any other information that would help us reproduce the issue?

Cheers,
Kostas

[1] 
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java

On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose  wrote:
>
> Hi,
>
> I am trying to get the parquet writer to write to s3; however, the files do 
> not seem to be rolling over. The same file "part-0-0.parquet" is being 
> created each time. Like the 'partCounter" is not being updated? Maybe the 
> Bucket is being recreated each time? I don't really know... Here are some 
> logs:
>
> 2020-04-09 01:28:10,350 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> checkpointing for checkpoint with id=2 (max part counter=2).
> 2020-04-09 01:28:10,589 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> received completion notification for checkpoint with id=2.
> 2020-04-09 01:28:10,589 INFO org.apache.flink.fs.s3.common.writer.S3Committer 
> - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> 2020-04-09 01:29:10,350 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> checkpointing for checkpoint with id=3 (max part counter=3).
> 2020-04-09 01:29:10,520 INFO 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask 0 
> received completion notification for checkpoint with id=3.
> 2020-04-09 01:29:10,521 INFO org.apache.flink.fs.s3.common.writer.S3Committer 
> - Committing bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID 
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> And a part of my code:
>
> ```
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> //env.setParallelism(2);
> env.enableCheckpointing(6L);
> ///PROPERTIES Added
> Schema schema = bro_conn.getClassSchema();
>
> OutputFileConfig config = OutputFileConfig
> .builder()
> .withPartSuffix(".parquet")
> .build();
>
> final StreamingFileSink sink = StreamingFileSink
> .forBulkFormat(new Path("s3a:///bro_conn/"), 
> ParquetAvroWriters.forGenericRecord(schema))
> //.withRollingPolicy(OnCheckpointRollingPolicy.build())
> .withOutputFileConfig(config)
> //.withBucketAssigner(new PartitioningBucketAssigner())
> .build();
>
> DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis", new SimpleStringSchema(), consumerConfig));
>
> kinesis.flatMap(new JsonAvroParser())
> .addSink(sink);
>
>
> env.execute("Bro Conn");
>
> ```
>
> I'm using Flink 1.10.0, and running in Kubernetes. I also created a custom 
> image to add the presto/hadoop plugin.
>
> Thanks again!


Re: Possible memory leak in JobManager (Flink 1.10.0)?

2020-04-09 Thread Till Rohrmann
Thanks for reporting this issue Marc. From what you've reported, I think
Yun is right and that the large memory footprint is caused by
CompletedCheckpoints which cannot be removed fast enough. One way to verify
this is to enable TRACE logging because then Flink will log for every
CompletedCheckpoint when it gets discarded. The line should look like this
"Executing discard procedure for Checkpoint". The high number of chk-X
folders on S3 could be the result of the slow discard operations.

If you want then we can also take a look at the logs and ideally also the
heap dump if you can share them with us.

I think one difference between Flink 1.10.0 and 1.7.2 is that we are using
a fixed thread pool for running the io operations in 1.10.0. The number of
threads equals the number of cores. In contrast, in Flink 1.7.2 we used a
fork join pool with a max parallelism of 64. This difference could explain
the lower throughput of discard operations because fewer can happen in
parallel.

Cheers,
Till

On Thu, Apr 9, 2020 at 10:09 AM Marc LEGER  wrote:

> Hello Yun,
>
> Thank you for your feedback, please find below my answers to your
> questions:
>
> 1. I am using incremental state checkpointing with RocksDB backend and AWS
> S3 as a distributed file system, everything is configured in
> flink-conf.yaml as follows:
>
> state.backend: rocksdb
> state.backend.incremental: true
> # placeholders are replaced at deploy time
> state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
> state.backend.rocksdb.localdir: /home/data/flink/rocksdb
>
> Size of _metdata file in a checkpoint folder for the 3 running jobs:
> - job1: 64KB
> - job2: 1K
> - job3: 10K
>
> By the way, I have between 1 and 2 "chk-X" folders per job in S3.
>
> 2. Checkpointing is configured to be triggered every second for all the
> jobs. Only the interval is set, otherwise everything is kept as default:
>
> executionEnvironment.enableCheckpointing(1000);
>
> Best Regards,
> Marc
>
> Le mer. 8 avr. 2020 à 20:48, Yun Tang  a écrit :
>
>> Hi Marc
>>
>> I think the occupied memory is due to the to-remove complete checkpoints
>> which are stored in the workQueue of io-executor [1] in
>> ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that
>> Executors#newFixedThreadPool would create a ThreadPoolExecutor with a
>> LinkedBlockingQueue to store runnables.
>>
>> To figure out the root cause, would you please check the information
>> below:
>>
>>1. How large of your checkpoint meta, you could view
>>{checkpoint-dir}/chk-X/_metadata to know the size, you could provide what
>>state backend you use to help know this.
>>2. What is the interval of your checkpoints, a smaller checkpoint
>>interval might accumulate many completed checkpoints to subsume once a
>>newer checkpoint completes.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260
>> [2]
>> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L234
>>
>> Best
>> Yun Tang
>>
>> --
>> *From:* Marc LEGER 
>> *Sent:* Wednesday, April 8, 2020 16:50
>> *To:* user@flink.apache.org 
>> *Subject:* Possible memory leak in JobManager (Flink 1.10.0)?
>>
>> Hello,
>>
>> I am currently testing Flink 1.10.0 but I am facing memory issues with
>> JobManagers deployed in a standalone cluster configured in HA mode with 3
>> TaskManagers (and 3 running jobs).
>> I do not reproduce the same issues using Flink 1.7.2.
>>
>> Basically, whatever the value of "jobmanager.heap.size" property is (I
>> tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process
>> is eventually consuming all available memory and is hanging after a few
>> hours or days (depending on the size of the heap) before being deassociated
>> from the cluster.
>>
>> I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
>> openjdk version "11.0.6" 2020-01-14
>> OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
>> Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux
>> amd64-64-Bit Compressed
>>
>> I performed a heap dump for analysis on the JobManager Java process and
>> generated a "Leak Suspects" report using Eclipse MAT.
>> The tool is detecting one main suspect (cf. attached screenshots):
>>
>> One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by
>> "" occupies 580,468,280 (92.82%) bytes. The instance
>> is referenced by
>> org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @
>> 0x8041fb48 , loaded by "".
>>
>> Has anyone already faced such an issue ?
>>
>> Best Regards,
>> Marc
>>
>


Upgrade of Cassandra driver in connector

2020-04-09 Thread Thms Hmm
Hey,
are there any plans to upgrade the version of the Cassandra driver to a
newer one?

Currently the driver v3.0 is integrated which dates back to 2016 and we are
having problems getting this working with AD authentication and Cassandra
V4.x.

Also the shading makes it not possible to easily exchange the driver. Is
that correct?

Thanks for your help.

Kind regards
Thomas


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Salva Alcántara
Perfectly understood, thanks a lot for your reply/patience . I will take a
look at AsyncWaitOperator and adapt from there if I really need that.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using multithreaded library within ProcessFunction with callbacks relying on the out parameter

2020-04-09 Thread Piotr Nowojski
Hi,

With:

> Can not you take into account the pending element that’s stuck somewhere in 
> the transit? Snapshot it as well and during recovery reprocess it? This is 
> exactly that’s AsyncWaitOperator is doing.

I didn’t mean for you to use AsynWaitOperator, but what both me and Arvid 
suggested you previously:

> Also as Kostas pointed out, the easiest way would be to try use 
> AsyncWaitOperator. If that’s not possible, you can implement your custom 
> logic based on its code.

You can copy/duplicate & modify/adjust the AsyncWaitOperator logic inside your 
custom operator. You don’t have to use it if you have some special 
requirements, you can implement your own custom logic. Specifically I meant to 
mimic 

org.apache.flink.streaming.api.operators.async.AsyncWaitOperator#queue

Field and how is it being used during snapshotting state & recovery.

Piotrek

> On 9 Apr 2020, at 06:10, Salva Alcántara  wrote:
> 
> I agree with your point Piotrek, AsyncIO would handle all the pending data
> for me. However, the reason why I did not want to use it is because in my
> case, the callbacks are not always called in response of new data being sent
> to the third party lib. Indeed, the callback will be called rather
> uncommonly (since in my case it will mean that an anomaly has been
> detected). This means that If I go with AsyncIO I will need to setup a max
> timeout for every element, when only a few of them will actuallyinvoke the
> callback (i.e., produce any data in response). This seems rather drastic
> because it will probably add too much latency unnecessarily, but I agree on
> that maybe there is no other way if I need exactly once guarantees.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Possible memory leak in JobManager (Flink 1.10.0)?

2020-04-09 Thread Marc LEGER
 Hello Yun,

Thank you for your feedback, please find below my answers to your questions:

1. I am using incremental state checkpointing with RocksDB backend and AWS
S3 as a distributed file system, everything is configured in
flink-conf.yaml as follows:

state.backend: rocksdb
state.backend.incremental: true
# placeholders are replaced at deploy time
state.checkpoints.dir: s3://#S3_BUCKET#/#SERVICE_ID#/flink/checkpoints
state.backend.rocksdb.localdir: /home/data/flink/rocksdb

Size of _metdata file in a checkpoint folder for the 3 running jobs:
- job1: 64KB
- job2: 1K
- job3: 10K

By the way, I have between 1 and 2 "chk-X" folders per job in S3.

2. Checkpointing is configured to be triggered every second for all the
jobs. Only the interval is set, otherwise everything is kept as default:

executionEnvironment.enableCheckpointing(1000);

Best Regards,
Marc

Le mer. 8 avr. 2020 à 20:48, Yun Tang  a écrit :

> Hi Marc
>
> I think the occupied memory is due to the to-remove complete checkpoints
> which are stored in the workQueue of io-executor [1] in
> ZooKeeperCompletedCheckpointStore [2]. One clue to prove this is that
> Executors#newFixedThreadPool would create a ThreadPoolExecutor with a
> LinkedBlockingQueue to store runnables.
>
> To figure out the root cause, would you please check the information below:
>
>1. How large of your checkpoint meta, you could view
>{checkpoint-dir}/chk-X/_metadata to know the size, you could provide what
>state backend you use to help know this.
>2. What is the interval of your checkpoints, a smaller checkpoint
>interval might accumulate many completed checkpoints to subsume once a
>newer checkpoint completes.
>
>
> [1]
> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L260
> [2]
> https://github.com/apache/flink/blob/d7e247209358779b6485062b69965b83043fb59d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L234
>
> Best
> Yun Tang
>
> --
> *From:* Marc LEGER 
> *Sent:* Wednesday, April 8, 2020 16:50
> *To:* user@flink.apache.org 
> *Subject:* Possible memory leak in JobManager (Flink 1.10.0)?
>
> Hello,
>
> I am currently testing Flink 1.10.0 but I am facing memory issues with
> JobManagers deployed in a standalone cluster configured in HA mode with 3
> TaskManagers (and 3 running jobs).
> I do not reproduce the same issues using Flink 1.7.2.
>
> Basically, whatever the value of "jobmanager.heap.size" property is (I
> tried with 2 GB, then 4GB and finally 8GB), the leader JobManager process
> is eventually consuming all available memory and is hanging after a few
> hours or days (depending on the size of the heap) before being deassociated
> from the cluster.
>
> I am using OpenJ9 JVM with Java 11 on CentOS 7.6 machines:
> openjdk version "11.0.6" 2020-01-14
> OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.6+10)
> Eclipse OpenJ9 VM AdoptOpenJDK (build openj9-0.18.1, JRE 11 Linux
> amd64-64-Bit Compressed
>
> I performed a heap dump for analysis on the JobManager Java process and
> generated a "Leak Suspects" report using Eclipse MAT.
> The tool is detecting one main suspect (cf. attached screenshots):
>
> One instance of "java.util.concurrent.ThreadPoolExecutor" loaded by
> "" occupies 580,468,280 (92.82%) bytes. The instance
> is referenced by
> org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices @
> 0x8041fb48 , loaded by "".
>
> Has anyone already faced such an issue ?
>
> Best Regards,
> Marc
>


Re: Complex graph-based sessionization (potential use for stateful functions)

2020-04-09 Thread Igal Shilman
Hi All,
One way to try to think about it with StateFun, is to represent the Graph
vertices as stateful functions instances. Unlike other frameworks an
instance of a function does not take any resources while idle, and
potentially you can have many millions of those.
A state for each vertex might be a list of adjacent vertices, and
potentially a timer so that they won’t linger for to long.
You would still have to think of what kind of graph algorithm to apply here.

I hope it helps,
Igal.

On Thursday, April 9, 2020, Robert Metzger  wrote:

> Hey Max,
>
> 1) Stateful functions has been released now: https://mvnrepository.
> com/artifact/org.apache.flink/statefun-flink-core
> See also: https://flink.apache.org/news/2020/04/07/release-
> statefun-2.0.0.html
> Getting Started: https://ci.apache.org/projects/flink/flink-
> statefun-docs-release-2.0/getting-started/java_walkthrough.html
>
> Please give us honest feedback on the "onboarding experience" with
> stateful functions. We are very eager to make the experience as smooth as
> possible :)
>
> 2) What do you consider large state? The Flink runtime itself can handle
> large events / messages (1GB should work). I'm not sure about statefun, but
> I will try to get an answer for you :)
>
> Best,
> Robert
>
>
> On Tue, Apr 7, 2020 at 9:31 AM m@xi  wrote:
>
>> Hello Robert
>>
>> Thanks to your reply I discovered the Stateful Functions which I believe
>> is
>> a quite powerful tool. I have some questions:
>>
>> 1) As you said, "the community is currently in the process of releasing
>> the
>> first Apache release of StateFun and it should hopefully be out by the end
>> of this week". Does this mean that it will become available in Maven
>> Repository?
>>
>> Because I can't find it while searching in
>> https://mvnrepository.com/artifact/org.apache.flink?sort=newest
>> or
>> use the API in my intellij project when I import the dependencies in my
>> POM
>> file.
>>
>> I though of dowloading the code from
>> https://ci.apache.org/projects/flink/flink-statefun-docs-master/,
>> compiling
>> it with *mvn clean package* and then import the produced jar file to my
>> intellij project as an External Library. Is this what you might recommend
>> for now?
>>
>> 2) I came across this tutorial by Stefan on stateful functions
>> https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that
>> arbitrary
>> communication between nodes/functions/actors is essentially made possible
>> by
>> introducing feedback loops to the DAG Flink topology (well now it has
>> circles I guess :P) to simulate the arbitrary messasing defined in the
>> StateFun topology.
>>
>> So the message passing is done by "feedback and tuple rerouting" and not
>> with MPI. Do you think (or have tested) if one may *efficiently*
>> send/receive (potentially large) state, like graph state which is the use
>> case of this post?  Or it is more suitable for sending control messages
>> between actors/functions?
>>
>> Thanks a lot in advance.
>>
>> Best,
>> Max
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
>> n4.nabble.com/
>>
>


Re: ListState with millions of elements

2020-04-09 Thread Aljoscha Krettek

On 08.04.20 20:14, Seth Wiesman wrote:

There is a limitation in RocksDB's JNI bridge that will cause applications
to fail if list state exceeds 2GB. I am not aware of anyone working on this
issue.


That is correct, and here's the Jira issue for it: 
https://issues.apache.org/jira/browse/FLINK-6761


There's also another issue about changing RocksDB list state to not 
store the whole ListState in a single RocksDB value but instead store 
the individual ListState entries in individual RocksDB values, under a 
specially formed "sequential" key. This is the issue: 
https://issues.apache.org/jira/browse/FLINK-8297


Best,
Aljoscha


Re: Upgrading Flink

2020-04-09 Thread Robert Metzger
Hey Stephen,

1. You should be able to migrate from 1.8 to 1.10:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html#compatibility-table

2. Yes, you need to recompile (but ideally you don't need to change
anything).



On Mon, Apr 6, 2020 at 10:19 AM Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

> Quick questions on upgrading Flink.
>
> All our jobs are compiled against Flink 1.8.x
>
> We are planning to upgrade to 1.10.x
>
> 1. Is the recommended path to upgrade one minor at a time, i.e. 1.8.x ->
> 1.9.x and then 1.9.x -> 1.10.x as a second step or is the big jump
> supported, i.e. 1.8.x -> 1.10.x in one change
>
> 2. Do we need to recompile the jobs against the newer Flink version before
> upgrading? Coordinating multiple teams can be tricky, so - short of
> spinning up a second flink cluster - our continuous deployment
> infrastructure will try to deploy the topologies compiled against 1.8.x for
> an hour or two after we have upgraded the cluster
>


Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-09 Thread Robert Metzger
Hey,
Others have experienced this as well, yes:
https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
I have also notified the Hadoop project about this issue:
https://issues.apache.org/jira/browse/HADOOP-15915

I agree with Congxian: You could try reaching out to the Hadoop user@ list
for additional help. Maybe logging on DEBUG level helps already?
If you are up for an adventure, you could also consider adding some
debugging code into Hadoop's DiskChecker and compile a custom Hadoop
version.

Best,
Robert


On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu  wrote:

> Hi LU
>
> I'm not familiar with S3 file system, maybe others in Flink community can
> help you in this case, or maybe you can also reach out to s3
> teams/community for help.
>
> Best,
> Congxian
>
>
> Lu Niu  于2020年4月8日周三 上午11:05写道:
>
>> Hi, Congxiao
>>
>> Thanks for replying. yeah, I also found those references. However, as I
>> mentioned in original post, there is enough capacity in all disk. Also,
>> when I switch to presto file system, the problem goes away. Wondering
>> whether others encounter similar issue.
>>
>> Best
>> Lu
>>
>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>> wrote:
>>
>>> Hi
>>> From the stack, seems the problem is that "org.apache.flink.fs.shaded.
>>> hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could
>>> not find any valid local directory for s3ablock-0001-", and I googled the
>>> exception, found there is some relative page[1], could you please make sure
>>> there is enough space on the local dis.
>>>
>>> [1]
>>> https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
>>> Best,
>>> Congxian
>>>
>>>
>>> Lu Niu  于2020年4月8日周三 上午8:41写道:
>>>
 Hi, flink users

 Did anyone encounter such error? The error comes from S3AFileSystem.
 But there is no capacity issue on any disk. we are using hadoop 2.7.1.
 ```

 Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
 Could not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
 org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
at 
 org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
at 
 org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
... 3 more
 Caused by: java.io.IOException: Could not open output stream for state 
 backend
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at 
 org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
at 
 org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at 
 java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
 org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
at 
 java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at 
 java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
at 
 org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:320)
at 
 org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.callInternal(RocksIncrementalSnapshotStrategy.java:263)
at 
 org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
  

Re: Fwd: Complex graph-based sessionization (potential use for stateful functions)

2020-04-09 Thread Robert Metzger
Hey Max,

1) Stateful functions has been released now:
https://mvnrepository.com/artifact/org.apache.flink/statefun-flink-core
See also:
https://flink.apache.org/news/2020/04/07/release-statefun-2.0.0.html
Getting Started:
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/getting-started/java_walkthrough.html

Please give us honest feedback on the "onboarding experience" with stateful
functions. We are very eager to make the experience as smooth as possible
:)

2) What do you consider large state? The Flink runtime itself can handle
large events / messages (1GB should work). I'm not sure about statefun, but
I will try to get an answer for you :)

Best,
Robert


On Tue, Apr 7, 2020 at 9:31 AM m@xi  wrote:

> Hello Robert
>
> Thanks to your reply I discovered the Stateful Functions which I believe is
> a quite powerful tool. I have some questions:
>
> 1) As you said, "the community is currently in the process of releasing the
> first Apache release of StateFun and it should hopefully be out by the end
> of this week". Does this mean that it will become available in Maven
> Repository?
>
> Because I can't find it while searching in
> https://mvnrepository.com/artifact/org.apache.flink?sort=newest
> or
> use the API in my intellij project when I import the dependencies in my POM
> file.
>
> I though of dowloading the code from
> https://ci.apache.org/projects/flink/flink-statefun-docs-master/,
> compiling
> it with *mvn clean package* and then import the produced jar file to my
> intellij project as an External Library. Is this what you might recommend
> for now?
>
> 2) I came across this tutorial by Stefan on stateful functions
> https://www.youtube.com/watch?v=fCeHCMJXXM0 where he mentions that
> arbitrary
> communication between nodes/functions/actors is essentially made possible
> by
> introducing feedback loops to the DAG Flink topology (well now it has
> circles I guess :P) to simulate the arbitrary messasing defined in the
> StateFun topology.
>
> So the message passing is done by "feedback and tuple rerouting" and not
> with MPI. Do you think (or have tested) if one may *efficiently*
> send/receive (potentially large) state, like graph state which is the use
> case of this post?  Or it is more suitable for sending control messages
> between actors/functions?
>
> Thanks a lot in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>